Alternating Least Squares (ALS) for Santander Kaggle competition

The Kaggle Santander competition just concluded. I decided for this competition to continue my learning process of spark environment and invest time in understanding how to do recommendation using Apache Spark. This was not a successful choice considering competition leader board :), but it gave me the opportunity to learn new strategy to work with data. I’m pretty sure I can do better with this kind of approach, learning more on how collaborative filtering works.

The recommendation strategy

Citing wikipedia :

Recommender systems or recommendation systems (sometimes replacing “system” with a synonym such as platform or engine) are a subclass of information filtering system that seek to predict the “rating” or “preference” that a user would give to an item.”

The main idea is to build a matrix users X items rating values and try to factorize it, to recommend main products rated by other users.

ALS

Apache Spark ML implements alternating least squares (ALS) for collaborative filtering, a very popular algorithm for making recommendations.

ALS recommender is a matrix factorization algorithm that uses Alternating Least Squares with Weighted-Lamda-Regularization (ALS-WR). It factors the user to item matrix A into the user-to-feature matrix U and the item-to-feature matrix M: It runs the ALS algorithm in a parallel fashion.  The ALS algorithm should uncover the latent factors that explain the observed user to item ratings and tries to find optimal factor weights to minimize the least squares between predicted and actual ratings.

The data at our disposal from kaggle site

In this competition, you are provided with 1.5 years of customers behavior data from Santander bank to predict what new products customers will purchase. The data starts at 2015-01-28 and has monthly records of products a customer has, such as “credit card”, “savings account”, etc. You will predict what additional products a customer will get in the last month, 2016-06-28, in addition to what they already have at 2016-05-28. These products are the columns named: ind_(xyz)_ult1, which are the columns #25 – #48 in the training data. You will predict what a customer will buy in addition to what they already had at 2016-05-28

So we have to select which among the 24 products a user will buy in the 2016-06-28 with respect to what he already has in the previous month.

 

ALS with explicit preferences

Following the discussion on kaggle forum, I tried to use one given month for the users  as explicit product rating.

The code

ALS using Spark 2.0+
import pyspark
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
import numpy as np

conf = SparkConf()
conf.set("spark.executor.memory", "6G")
conf.set("spark.driver.memory", "2G")
conf.set("spark.executor.cores", "4")

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.default.parallelism", "4")
conf.setMaster('local[4]')

spark = SparkSession \
    .builder.config(conf=conf) \
    .appName("spark-santander-recommendation").getOrCreate()
Reading data from disk
# spark is an existing SparkSession
full = spark.read.csv("../input/train_ver2.csv", header="true",inferSchema="true").select('ncodpers','fecha_dato','ind_ahor_fin_ult1', 'ind_aval_fin_ult1', 'ind_cco_fin_ult1', 'ind_cder_fin_ult1', 'ind_cno_fin_ult1', 'ind_ctju_fin_ult1', 'ind_ctma_fin_ult1', 'ind_ctop_fin_ult1', 'ind_ctpp_fin_ult1', 'ind_deco_fin_ult1', 'ind_deme_fin_ult1', 'ind_dela_fin_ult1', 'ind_ecue_fin_ult1', 'ind_fond_fin_ult1', 'ind_hip_fin_ult1', 'ind_plan_fin_ult1', 'ind_pres_fin_ult1', 'ind_reca_fin_ult1', 'ind_tjcr_fin_ult1', 'ind_valo_fin_ult1', 'ind_viv_fin_ult1', 'ind_nomina_ult1', 'ind_nom_pens_ult1', 'ind_recibo_ult1')
Selecting the training and comparison data set
df=full[(full['fecha_dato']=='2015-06-28')].select('ncodpers','ind_ahor_fin_ult1', 'ind_aval_fin_ult1', 
                                                 'ind_cco_fin_ult1', 'ind_cder_fin_ult1', 'ind_cno_fin_ult1', 
                                                 'ind_ctju_fin_ult1', 'ind_ctma_fin_ult1', 'ind_ctop_fin_ult1', 
                                                 'ind_ctpp_fin_ult1', 'ind_deco_fin_ult1', 'ind_deme_fin_ult1', 
                                                 'ind_dela_fin_ult1', 'ind_ecue_fin_ult1', 'ind_fond_fin_ult1', 
                                                 'ind_hip_fin_ult1', 'ind_plan_fin_ult1', 'ind_pres_fin_ult1', 
                                                 'ind_reca_fin_ult1', 'ind_tjcr_fin_ult1', 'ind_valo_fin_ult1', 
                                                 'ind_viv_fin_ult1', 'ind_nomina_ult1', 'ind_nom_pens_ult1', 
                                                 'ind_recibo_ult1')

df_val=full[full['fecha_dato']=='2016-05-28'].select('ncodpers','ind_ahor_fin_ult1', 'ind_aval_fin_ult1', 
                                                 'ind_cco_fin_ult1', 'ind_cder_fin_ult1', 'ind_cno_fin_ult1', 
                                                 'ind_ctju_fin_ult1', 'ind_ctma_fin_ult1', 'ind_ctop_fin_ult1', 
                                                 'ind_ctpp_fin_ult1', 'ind_deco_fin_ult1', 'ind_deme_fin_ult1', 
                                                 'ind_dela_fin_ult1', 'ind_ecue_fin_ult1', 'ind_fond_fin_ult1', 
                                                 'ind_hip_fin_ult1', 'ind_plan_fin_ult1', 'ind_pres_fin_ult1', 
                                                 'ind_reca_fin_ult1', 'ind_tjcr_fin_ult1', 'ind_valo_fin_ult1', 
                                                 'ind_viv_fin_ult1', 'ind_nomina_ult1', 'ind_nom_pens_ult1', 
                                                 'ind_recibo_ult1')
import pandas as pd 
lista_users= pd.read_csv("../input/test_ver2.csv", usecols=["ncodpers"])
lista_users["ncodpers"].values[:10]
array([  15889, 1170544, 1170545, 1170547, 1170548, 1170550, 1170552,
       1170553, 1170555, 1170557])

lista_products=['ind_ahor_fin_ult1', 'ind_aval_fin_ult1', 'ind_cco_fin_ult1', 'ind_cder_fin_ult1', 'ind_cno_fin_ult1', 'ind_ctju_fin_ult1', 'ind_ctma_fin_ult1', 'ind_ctop_fin_ult1', 'ind_ctpp_fin_ult1', 'ind_deco_fin_ult1', 'ind_deme_fin_ult1', 'ind_dela_fin_ult1', 'ind_ecue_fin_ult1', 'ind_fond_fin_ult1', 'ind_hip_fin_ult1', 'ind_plan_fin_ult1', 'ind_pres_fin_ult1', 'ind_reca_fin_ult1', 'ind_tjcr_fin_ult1', 'ind_valo_fin_ult1', 'ind_viv_fin_ult1', 'ind_nomina_ult1', 'ind_nom_pens_ult1', 'ind_recibo_ult1']
Need to change the format of input data in rating (userId, productId,rating)

Now, each of our target product will be an int corresponding number to which we want to associate a rating

from pyspark.sql import Row
ratingsRDD = df.rdd.map(lambda p: Row(userId=p[0], itemCol=[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23],rating=p[-24:]))
ratings = spark.createDataFrame(ratingsRDD)
ratingsRDD_val = df_val.rdd.map(lambda p: Row(userId=p[0], itemCol=[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23],rating=p[-24:]))
val = spark.createDataFrame(ratingsRDD_val)
ratings=ratings.select( 'userId','itemCol','rating')
val=val.select( 'userId','itemCol','rating')
ratings.show(10)
+---------+--------------------+--------------------+
|   userId|             itemCol|              rating|
+---------+--------------------+--------------------+
|  16132.0|[0, 1, 2, 3, 4, 5...|[0,0,1,0,0,0,0,0,...|
|1063040.0|[0, 1, 2, 3, 4, 5...|[0,0,1,0,0,0,0,0,...|
|1063041.0|[0, 1, 2, 3, 4, 5...|[0,0,1,0,0,0,0,0,...|
|1063042.0|[0, 1, 2, 3, 4, 5...|[0,0,1,0,0,0,0,0,...|
|1063043.0|[0, 1, 2, 3, 4, 5...|[0,0,1,0,0,0,0,0,...|
|1063044.0|[0, 1, 2, 3, 4, 5...|[0,0,1,0,0,0,0,0,...|
|1063045.0|[0, 1, 2, 3, 4, 5...|[0,0,1,0,0,0,0,0,...|
|1063047.0|[0, 1, 2, 3, 4, 5...|[0,0,1,0,0,0,0,0,...|
|1063049.0|[0, 1, 2, 3, 4, 5...|[0,0,1,0,0,0,0,0,...|
|1063050.0|[0, 1, 2, 3, 4, 5...|[0,0,1,0,0,0,0,0,...|
+---------+--------------------+--------------------+
only showing top 10 rows

We need to change the data format: for each user and product we need a rate

from pyspark.sql.functions import split, explode
ratings=ratings.select('userId', explode('itemCol'),'rating')
val=val.select('userId', explode('itemCol'),'rating')
ratings.show(10)
+-------+---+--------------------+
| userId|col|              rating|
+-------+---+--------------------+
|16132.0|  0|[0,0,1,0,0,0,0,0,...|
|16132.0|  1|[0,0,1,0,0,0,0,0,...|
|16132.0|  2|[0,0,1,0,0,0,0,0,...|
|16132.0|  3|[0,0,1,0,0,0,0,0,...|
|16132.0|  4|[0,0,1,0,0,0,0,0,...|
|16132.0|  5|[0,0,1,0,0,0,0,0,...|
|16132.0|  6|[0,0,1,0,0,0,0,0,...|
|16132.0|  7|[0,0,1,0,0,0,0,0,...|
|16132.0|  8|[0,0,1,0,0,0,0,0,...|
|16132.0|  9|[0,0,1,0,0,0,0,0,...|
+-------+---+--------------------+
only showing top 10 rows
val.show(10)
+--------+---+--------------------+
|  userId|col|              rating|
+--------+---+--------------------+
|657640.0|  0|[0,0,0,0,0,0,0,0,...|
|657640.0|  1|[0,0,0,0,0,0,0,0,...|
|657640.0|  2|[0,0,0,0,0,0,0,0,...|
|657640.0|  3|[0,0,0,0,0,0,0,0,...|
|657640.0|  4|[0,0,0,0,0,0,0,0,...|
|657640.0|  5|[0,0,0,0,0,0,0,0,...|
|657640.0|  6|[0,0,0,0,0,0,0,0,...|
|657640.0|  7|[0,0,0,0,0,0,0,0,...|
|657640.0|  8|[0,0,0,0,0,0,0,0,...|
|657640.0|  9|[0,0,0,0,0,0,0,0,...|
+--------+---+--------------------+
only showing top 10 rows
ratingsRDD = ratings.rdd.map(lambda p: Row(userId=p[0],itemCol=p[1],ranking=p[2][int(p[1])]))
ratings2 = ratingsRDD.toDF()
ratingsRDD_val = val.rdd.map(lambda p: Row(userId=p[0],itemCol=p[1],ranking=p[2][int(p[1])]))
validation = ratingsRDD_val.toDF()
training=ratings2.withColumn("userId", ratings2["userId"].cast("int")).withColumn("itemCol", ratings2["itemCol"].cast("int")).withColumn("ranking", ratings2["ranking"].cast("double"))
test=validation.withColumn("userId", validation["userId"].cast("int")).withColumn("itemCol", validation["itemCol"].cast("int")).withColumn("ranking", validation["ranking"].cast("double"))
training.show(24)
+-------+-------+------+
|itemCol|ranking|userId|
+-------+-------+------+
|      0|    0.0| 16132|
|      1|    0.0| 16132|
|      2|    1.0| 16132|
|      3|    0.0| 16132|
|      4|    0.0| 16132|
|      5|    0.0| 16132|
|      6|    0.0| 16132|
|      7|    0.0| 16132|
|      8|    0.0| 16132|
|      9|    0.0| 16132|
|     10|    0.0| 16132|
|     11|    0.0| 16132|
|     12|    0.0| 16132|
|     13|    0.0| 16132|
|     14|    0.0| 16132|
|     15|    0.0| 16132|
|     16|    0.0| 16132|
|     17|    0.0| 16132|
|     18|    0.0| 16132|
|     19|    0.0| 16132|
|     20|    0.0| 16132|
|     21|   null| 16132|
|     22|   null| 16132|
|     23|    0.0| 16132|
+-------+-------+------+
only showing top 24 rows
training=training.na.fill(0.0)
training.show(24)
+-------+-------+------+
|itemCol|ranking|userId|
+-------+-------+------+
|      0|    0.0| 16132|
|      1|    0.0| 16132|
|      2|    1.0| 16132|
|      3|    0.0| 16132|
|      4|    0.0| 16132|
|      5|    0.0| 16132|
|      6|    0.0| 16132|
|      7|    0.0| 16132|
|      8|    0.0| 16132|
|      9|    0.0| 16132|
|     10|    0.0| 16132|
|     11|    0.0| 16132|
|     12|    0.0| 16132|
|     13|    0.0| 16132|
|     14|    0.0| 16132|
|     15|    0.0| 16132|
|     16|    0.0| 16132|
|     17|    0.0| 16132|
|     18|    0.0| 16132|
|     19|    0.0| 16132|
|     20|    0.0| 16132|
|     21|    0.0| 16132|
|     22|    0.0| 16132|
|     23|    0.0| 16132|
+-------+-------+------+
only showing top 24 rows
training[training['ranking']!=0.0].count()
1041274

The test set is the one containing the userId and the product not still ‘rated’ we want to ‘recommend’

test=test.na.fill(0.0)
test.show(24)
+-------+-------+------+
|itemCol|ranking|userId|
+-------+-------+------+
|      0|    0.0|657640|
|      1|    0.0|657640|
|      2|    0.0|657640|
|      3|    0.0|657640|
|      4|    0.0|657640|
|      5|    0.0|657640|
|      6|    0.0|657640|
|      7|    0.0|657640|
|      8|    0.0|657640|
|      9|    0.0|657640|
|     10|    0.0|657640|
|     11|    0.0|657640|
|     12|    0.0|657640|
|     13|    0.0|657640|
|     14|    0.0|657640|
|     15|    0.0|657640|
|     16|    0.0|657640|
|     17|    0.0|657640|
|     18|    0.0|657640|
|     19|    0.0|657640|
|     20|    0.0|657640|
|     21|    0.0|657640|
|     22|    0.0|657640|
|     23|    0.0|657640|
+-------+-------+------+
only showing top 24 rows
train, val=training.randomSplit([0.8,0.2],10)
import itertools
from math import sqrt
from operator import add
import sys
from pyspark.ml.recommendation import ALS
sc=spark.sparkContext
sc.setCheckpointDir('checkpoint/') 

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="ranking",
                                predictionCol="prediction")
def computeRmse(model, data):
    """
    Compute RMSE (Root mean Squared Error).
    """
    predictions = model.transform(data)
    rmse = evaluator.evaluate(predictions)
    print("Root-mean-square error = " + str(rmse)) 
    return rmse            

#train models and evaluate them on the validation set

ranks = [15]
lambdas = [0.05]
numIters = [30]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
training=training.na.drop()
test=test.na.drop() 
val=val.na.drop() 
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
    als = ALS(rank=rank, maxIter=numIter, regParam=lmbda, numUserBlocks=10, numItemBlocks=10, implicitPrefs=False, 
              alpha=1.0, 
              userCol="userId", itemCol="itemCol", seed=1, ratingCol="ranking", nonnegative=True, 
              checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", finalStorageLevel="MEMORY_AND_DISK")
    model=als.fit(training)

    validationRmse = computeRmse(model, val)
    print "RMSE (validation) = %f for the model trained with " % validationRmse + \
            "rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
    if (validationRmse < bestValidationRmse):
        bestModel = model
        bestValidationRmse = validationRmse
        bestRank = rank
        bestLambda = lmbda
        bestNumIter = numIter

model=bestModel
Root-mean-square error = 0.0359369589688
RMSE (validation) = 0.035937 for the model trained with rank = 15, lambda = 0.1, and numIter = 30.
print validationRmse
0.0359369589688
Save model on disk
model_path = "model/" 
model.save(model_path)
Determining what product user userId has not already bought and rated so that we can make new products recommendations
topredict=test[test['ranking']==0]
predictions=model.transform(topredict)
predictions.first()
Row(itemCol=12, ranking=0.0, userId=16339, prediction=0.10545945167541504)
Recom=predictions.rdd.map(lambda p: Row(user=p[2],ProductPredictions=(p[0],p[3]))).toDF()
Recom=Recom.sort('user')
Recom.show(10)
+--------------------+-----+
|  ProductPredictions| user|
+--------------------+-----+
|[15,0.01536364294...|15889|
|[6,0.001132436329...|15889|
|[16,0.01078280992...|15889|
|[3,0.008875329978...|15889|
|             [5,0.0]|15889|
|[20,0.00726718548...|15889|
|            [22,0.0]|15889|
|[12,0.06198660656...|15889|
|[1,8.980201673693...|15889|
|[13,0.02462876401...|15889|
+--------------------+-----+
only showing top 10 rows
from pyspark.sql import functions as F
ppT=Recom.groupby("user").agg(F.collect_list("ProductPredictions"))
ppT.printSchema()
root
 |-- user: long (nullable = true)
 |-- collect_list(ProductPredictions): array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: long (nullable = true)
 |    |    |-- _2: double (nullable = true)
ppT=ppT.withColumn("ncodpers", ppT["user"].cast("int")).withColumn("itemCol", ppT["collect_list(ProductPredictions)"]).drop('user',"collect_list(ProductPredictions)")
ppT.printSchema()
root
 |-- ncodpers: integer (nullable = true)
 |-- itemCol: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: long (nullable = true)
 |    |    |-- _2: double (nullable = true)
ppTP=ppT.toPandas()
ppTP.head(10)
ncodpersitemCol
015889[(12, 0.0619866065681), (22, 0.0), (1, 0.00089…
115890[(22, 0.0), (1, 0.000633358489722), (13, 0.025…
215892[(22, 0.0), (1, 0.00121831032448), (13, 0.0379…
315893[(12, 0.0262373052537), (22, 0.0), (1, 0.00067…
415894[(22, 0.0), (1, 0.00124242738821), (13, 0.0272…
515895[(22, 0.0), (1, 0.00119186483789), (13, 0.0441…
615896[(22, 0.0), (1, 1.03697550458e-06), (13, 0.000…
715897[(22, 0.0), (1, 0.00129512546118), (6, 0.00380…
815898[(12, nan), (22, nan), (1, nan), (13, nan), (6…
915899[(12, 0.0511704608798), (22, 0.0), (1, 0.00084…
ppTP[ppTP.ncodpers==15889]
ncodpersitemCol
015889[(12, 0.0619866065681), (22, 0.0), (1, 0.00089…
print len(ppTP)
931453
print len(lista_users)
929615
predFin=ppTP.merge(lista_users,how="right",on='ncodpers')
print len(predFin)
929615
predFin.head(10)
ncodpersitemCol
015889[(12, 0.0619866065681), (22, 0.0), (1, 0.00089…
115890[(22, 0.0), (1, 0.000633358489722), (13, 0.025…
215892[(22, 0.0), (1, 0.00121831032448), (13, 0.0379…
315893[(12, 0.0262373052537), (22, 0.0), (1, 0.00067…
415894[(22, 0.0), (1, 0.00124242738821), (13, 0.0272…
515895[(22, 0.0), (1, 0.00119186483789), (13, 0.0441…
615896[(22, 0.0), (1, 1.03697550458e-06), (13, 0.000…
715897[(22, 0.0), (1, 0.00129512546118), (6, 0.00380…
815898[(12, nan), (22, nan), (1, nan), (13, nan), (6…
915899[(12, 0.0511704608798), (22, 0.0), (1, 0.00084…
text = 'ncodpers,added_products\n'
for i, ncodpers in enumerate(predFin[predFin.ncodpers==16861].ncodpers):
        text += '%i,' % ncodpers
        item=predFin["itemCol"].values[i]
        print item
        newitem = sorted(item, key=lambda x: x[1],reverse=True)
        print newitem
        for j in range(len(newitem)):

                text += '%s ' % lista_products[newitem[j][0]]

        text += '\n'
print text
[Row(_1=12, _2=0.06198660656809807), Row(_1=22, _2=0.0), Row(_1=1, _2=0.0008980201673693955), Row(_1=13, _2=0.024628764018416405), Row(_1=6, _2=0.001132436329498887), Row(_1=16, _2=0.010782809928059578), Row(_1=3, _2=0.008875329978764057), Row(_1=20, _2=0.007267185486853123), Row(_1=5, _2=0.0), Row(_1=15, _2=0.015363642945885658), Row(_1=9, _2=0.003559185191988945), Row(_1=17, _2=0.054485660046339035), Row(_1=4, _2=0.10679417848587036), Row(_1=23, _2=0.16336441040039062), Row(_1=7, _2=0.05555544048547745), Row(_1=10, _2=0.010406216606497765), Row(_1=21, _2=0.0), Row(_1=11, _2=0.02240748517215252), Row(_1=14, _2=0.0050923265516757965), Row(_1=0, _2=0.000912301242351532)]
[Row(_1=23, _2=0.16336441040039062), Row(_1=4, _2=0.10679417848587036), Row(_1=12, _2=0.06198660656809807), Row(_1=7, _2=0.05555544048547745), Row(_1=17, _2=0.054485660046339035), Row(_1=13, _2=0.024628764018416405), Row(_1=11, _2=0.02240748517215252), Row(_1=15, _2=0.015363642945885658), Row(_1=16, _2=0.010782809928059578), Row(_1=10, _2=0.010406216606497765), Row(_1=3, _2=0.008875329978764057), Row(_1=20, _2=0.007267185486853123), Row(_1=14, _2=0.0050923265516757965), Row(_1=9, _2=0.003559185191988945), Row(_1=6, _2=0.001132436329498887), Row(_1=0, _2=0.000912301242351532), Row(_1=1, _2=0.0008980201673693955), Row(_1=22, _2=0.0), Row(_1=5, _2=0.0), Row(_1=21, _2=0.0)]
ncodpers,added_products
16861,ind_recibo_ult1 ind_cno_fin_ult1 ind_ecue_fin_ult1 ind_ctop_fin_ult1 ind_reca_fin_ult1 ind_fond_fin_ult1 ind_dela_fin_ult1 ind_plan_fin_ult1 ind_pres_fin_ult1 ind_deme_fin_ult1 ind_cder_fin_ult1 ind_viv_fin_ult1 ind_hip_fin_ult1 ind_deco_fin_ult1 ind_ctma_fin_ult1 ind_ahor_fin_ult1 ind_aval_fin_ult1 ind_nom_pens_ult1 ind_ctju_fin_ult1 ind_nomina_ult1 
text = 'ncodpers,added_products\n'
for i, ncodpers in enumerate(predFin[predFin.ncodpers==15889].ncodpers):
        text += '%i,' % ncodpers
        item=predFin["itemCol"].values[i]
        print item
        newitem = sorted(item, key=lambda x: x[1],reverse=True)
        print newitem
        for j in range(len(newitem)):
                text += '%s ' % lista_products[newitem[j][0]]

        text += '\n'
print text
[Row(_1=12, _2=0.06198660656809807), Row(_1=22, _2=0.0), Row(_1=1, _2=0.0008980201673693955), Row(_1=13, _2=0.024628764018416405), Row(_1=6, _2=0.001132436329498887), Row(_1=16, _2=0.010782809928059578), Row(_1=3, _2=0.008875329978764057), Row(_1=20, _2=0.007267185486853123), Row(_1=5, _2=0.0), Row(_1=15, _2=0.015363642945885658), Row(_1=9, _2=0.003559185191988945), Row(_1=17, _2=0.054485660046339035), Row(_1=4, _2=0.10679417848587036), Row(_1=23, _2=0.16336441040039062), Row(_1=7, _2=0.05555544048547745), Row(_1=10, _2=0.010406216606497765), Row(_1=21, _2=0.0), Row(_1=11, _2=0.02240748517215252), Row(_1=14, _2=0.0050923265516757965), Row(_1=0, _2=0.000912301242351532)]
[Row(_1=23, _2=0.16336441040039062), Row(_1=4, _2=0.10679417848587036), Row(_1=12, _2=0.06198660656809807), Row(_1=7, _2=0.05555544048547745), Row(_1=17, _2=0.054485660046339035), Row(_1=13, _2=0.024628764018416405), Row(_1=11, _2=0.02240748517215252), Row(_1=15, _2=0.015363642945885658), Row(_1=16, _2=0.010782809928059578), Row(_1=10, _2=0.010406216606497765), Row(_1=3, _2=0.008875329978764057), Row(_1=20, _2=0.007267185486853123), Row(_1=14, _2=0.0050923265516757965), Row(_1=9, _2=0.003559185191988945), Row(_1=6, _2=0.001132436329498887), Row(_1=0, _2=0.000912301242351532), Row(_1=1, _2=0.0008980201673693955), Row(_1=22, _2=0.0), Row(_1=5, _2=0.0), Row(_1=21, _2=0.0)]
ncodpers,added_products
15889,ind_recibo_ult1 ind_cno_fin_ult1 ind_ecue_fin_ult1 ind_ctop_fin_ult1 ind_reca_fin_ult1 ind_fond_fin_ult1 ind_dela_fin_ult1 ind_plan_fin_ult1 ind_pres_fin_ult1 ind_deme_fin_ult1 ind_cder_fin_ult1 ind_viv_fin_ult1 ind_hip_fin_ult1 ind_deco_fin_ult1 ind_ctma_fin_ult1 ind_ahor_fin_ult1 ind_aval_fin_ult1 ind_nom_pens_ult1 ind_ctju_fin_ult1 ind_nomina_ult1 
Do prediction on not rated products and prepare the submission file
import datetime
import numpy as np
import gzip
def create_submission(preds,target_cols):
    print ('Saving results on disk')
    info_string = 'ALS'
    now = datetime.datetime.now()
    sub_file = 'ppT-submission_' + info_string + '_' + str(now.strftime("%Y-%m-%d-%H-%M")) + '.csv'
    # Create the submission text
    print 'Creating text...'
    text = 'ncodpers,added_products\n'
    for i, ncodpers in enumerate(preds.ncodpers):
        text += '%i,' % ncodpers   
        item=predFin["itemCol"].values[i]
        #print item
        newitem = sorted(item, key=lambda x: x[1],reverse=True)

        for j in range(len(newitem)):

                text += '%s ' % lista_products[newitem[j][0]]

        text += '\n'
    # Write to file
    print ("writing to file")
    with gzip.open('%s.gz' % sub_file, 'w') as f:
        f.write(text)

create_submission(predFin,lista_products)
Saving results on disk
Creating text...
writing to file
spark.stop()

 

Public LeaderBoard score is 0.0162316 !:(

 Consider Implicit Preferences

I tried also different setup for the problem using the full data set over the entire period of data. The idea was to identify as implicit preferences the number or mean of products the users got in the past period. This could be seen as an implicit rating.

To transform the data we had over 17months of time in a recommendation-like problem, I selected the user preference as implicit preference by summing or averaging all the product they acquired during the entire period.

The number of time they bought a product is an implicit positive rating of the product itself and this can be used to do recommendation for other users.

Reading full data set

df=full.select('ncodpers','ind_ahor_fin_ult1', 'ind_aval_fin_ult1', 
                                                 'ind_cco_fin_ult1', 'ind_cder_fin_ult1', 'ind_cno_fin_ult1', 
                                                 'ind_ctju_fin_ult1', 'ind_ctma_fin_ult1', 'ind_ctop_fin_ult1', 
                                                 'ind_ctpp_fin_ult1', 'ind_deco_fin_ult1', 'ind_deme_fin_ult1', 
                                                 'ind_dela_fin_ult1', 'ind_ecue_fin_ult1', 'ind_fond_fin_ult1', 
                                                 'ind_hip_fin_ult1', 'ind_plan_fin_ult1', 'ind_pres_fin_ult1', 
                                                 'ind_reca_fin_ult1', 'ind_tjcr_fin_ult1', 'ind_valo_fin_ult1', 
                                                 'ind_viv_fin_ult1', 'ind_nomina_ult1', 'ind_nom_pens_ult1', 
                                                 'ind_recibo_ult1')

df_val=full[full['fecha_dato']=='2016-05-28'].select('ncodpers','ind_ahor_fin_ult1', 'ind_aval_fin_ult1', 
                                                 'ind_cco_fin_ult1', 'ind_cder_fin_ult1', 'ind_cno_fin_ult1', 
                                                 'ind_ctju_fin_ult1', 'ind_ctma_fin_ult1', 'ind_ctop_fin_ult1', 
                                                 'ind_ctpp_fin_ult1', 'ind_deco_fin_ult1', 'ind_deme_fin_ult1', 
                                                 'ind_dela_fin_ult1', 'ind_ecue_fin_ult1', 'ind_fond_fin_ult1', 
                                                 'ind_hip_fin_ult1', 'ind_plan_fin_ult1', 'ind_pres_fin_ult1', 
                                                 'ind_reca_fin_ult1', 'ind_tjcr_fin_ult1', 'ind_valo_fin_ult1', 
                                                 'ind_viv_fin_ult1', 'ind_nomina_ult1', 'ind_nom_pens_ult1', 
                                                 'ind_recibo_ult1')

Need to change the format of input data in rating (userId,productId,rate)

Rating

To consider this problem as for implicit training, I considered the mean of the products bought by the user during the 1.5 year
as the rate for the product itself.

trainU=df.groupBy('ncodpers').agg({'ind_ahor_fin_ult1':'mean',
                                      'ind_aval_fin_ult1':'mean', 
                                      'ind_cco_fin_ult1':'mean', 
                                      'ind_cder_fin_ult1':'mean', 
                                      'ind_cno_fin_ult1':'mean', 
                                      'ind_ctju_fin_ult1':'mean', 
                                      'ind_ctma_fin_ult1':'mean', 
                                      'ind_ctop_fin_ult1':'mean', 
                                      'ind_ctpp_fin_ult1':'mean', 
                                      'ind_deco_fin_ult1':'mean', 
                                      'ind_deme_fin_ult1':'mean', 
                                      'ind_dela_fin_ult1':'mean', 
                                      'ind_ecue_fin_ult1':'mean', 
                                      'ind_fond_fin_ult1':'mean', 
                                      'ind_hip_fin_ult1':'mean', 
                                      'ind_plan_fin_ult1':'mean', 
                                      'ind_pres_fin_ult1':'mean', 
                                      'ind_reca_fin_ult1':'mean', 
                                      'ind_tjcr_fin_ult1':'mean', 
                                      'ind_valo_fin_ult1':'mean', 
                                      'ind_viv_fin_ult1':'mean', 
                                      'ind_nomina_ult1':'mean', 
                                      'ind_nom_pens_ult1':'mean', 
                                      'ind_recibo_ult1':'mean'
                                     }).collect()
df=spark.createDataFrame(trainU)
import itertools
from math import sqrt
from operator import add
import sys
from pyspark.ml.recommendation import ALS
sc=spark.sparkContext
sc.setCheckpointDir('checkpoint/') 

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="ranking",
                                predictionCol="prediction")
def computeRmse(model, data):
    """
    Compute RMSE (Root mean Squared Error).
    """
    predictions = model.transform(data)
    rmse = evaluator.evaluate(predictions)
    print("Root-mean-square error = " + str(rmse)) 
    return rmse            

#train models and evaluate them on the validation set

ranks = [30]
lambdas = [0.05]
numIters = [200]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
training=training.na.drop()
test=test.na.drop() 
val=val.na.drop() 
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
    als = ALS(rank=rank, maxIter=numIter, regParam=lmbda, numUserBlocks=24, numItemBlocks=24, implicitPrefs=True, 
              alpha=30.0, 
              userCol="userId", itemCol="itemCol", seed=11, ratingCol="ranking", nonnegative=True, 
              checkpointInterval=10, intermediateStorageLevel="MEMORY_AND_DISK", finalStorageLevel="MEMORY_AND_DISK")
    model=als.fit(train)

    validationRmse = computeRmse(model, val)
    print "RMSE (validation) = %f for the model trained with " % validationRmse + \
            "rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
    if (validationRmse < bestValidationRmse):
        bestModel = model
        bestValidationRmse = validationRmse
        bestRank = rank
        bestLambda = lmbda
        bestNumIter = numIter

model=bestModel
Root-mean-square error = 0.225247236858
RMSE (validation) = 0.225247 for the model trained with rank = 30, lambda = 0.1, and numIter = 200.

 Leader Board score is very very disappointing: only 0.003522!!!     🙁

 

Useful links

Here it is a list of blog post I found very useful to understand the recommendation and collaborative filtering world.