Using PySpark for RedHat Kaggle competition

Redhat Kaggle competition is not so prohibitive from a computational point of view or data management.

The use of Pandas and xgboost, R allows you to get good scores. The idea is then to use Apache Spark only as an example of tutorials.

First of all, the merging of more data frame in PySpark is not as efficient as in pandas, and I don’t fully trust in the code I wrote to merge 2 different DataFrame in which the columns name are the same.

So a first implementation was  to do firstly  the merging of the data frame using the utilities of pandas and then analyse the data in spark, reading the merged csv file  from disk.
This was the code:

 

[pastacode lang=”python” manual=”%22%22%22%0Aredhat-pandas-spark.py%0A%40author%20Elena%20Cuoco%0A%22%22%22%0A%0Afrom%20pyspark%20import%20SparkConf%0Afrom%20pyspark%20import%20SparkContext%0Afrom%20pyspark.sql%20import%20SQLContext%0Afrom%20pyspark.sql%20import%20SparkSession%0Afrom%20pyspark.sql.types%20import%20*%0Aimport%20atexit%0Aimport%20pandas%20as%20pd%0Afrom%20pyspark.ml.classification%20import%20LogisticRegression%0Afrom%20pyspark.ml.evaluation%20import%20MulticlassClassificationEvaluator%0Afrom%20pyspark.ml.feature%20import%20HashingTF%2C%20IDF%0A%0A%23%23%23%23%20Init%20Spark%0Aconf%20%3D%20SparkConf()%0Aconf.set(%22spark.executor.memory%22%2C%20%224G%22)%0Aconf.set(%22spark.driver.memory%22%2C%2218G%22)%0Aconf.set(%22spark.executor.cores%22%2C%227%22)%0Aconf.set(%22spark.python.worker.memory%22%2C%224G%22)%0Aconf.set(%22spark.driver.maxResultSize%22%2C%220%22)%0Aconf.set(%22spark.sql.crossJoin.enabled%22%2C%22true%22)%0Aconf.set(%22spark.serializer%22%2C%22org.apache.spark.serializer.KryoSerializer%22)%0Aconf.set(%22spark.default.parallelism%22%2C%222%22)%0Aconf.set(%22spark.sql.crossJoin.enabled%22%2C%20%22true%22)%0Asc%20%3D%20SparkContext(conf%3Dconf.setMaster(‘local%5B*%5D’))%0Asc.setLogLevel(%22WARN%22)%0AsqlContext%20%3D%20SQLContext(sc)%0Aatexit.register(lambda%3A%20sc.stop())%0A%0Aspark%20%3D%20SparkSession%20%5C%0A%20%20%20%20.builder.config(conf%3Dconf)%20%5C%0A%20%20%20%20.appName(%22spark-xgb%22).getOrCreate()%0A%23%20spark%20is%20an%20existing%20SparkSession%0Adata%3D%20spark.read.csv(%22..%2Finput%2FtrainF.csv%22%2C%20header%3D%22true%22%2C%20inferSchema%3D%22true%22%2Cmode%3D%22DROPMALFORMED%22)%0Adatatest%20%3D%20spark.read.csv(%22..%2Finput%2FtestF.csv%22%2C%20header%3D%22true%22%2C%20inferSchema%3D%22true%22%2Cmode%3D%22DROPMALFORMED%22)%0A%0Apred_spark%20%3D%20pd.DataFrame()%0Apred_spark%5B’activity_id’%5D%20%3D%20datatest.select(%22activity_id%22).rdd.map(lambda%20r%3A%20r%5B0%5D).collect()%0Adata%3Ddata.na.fill(-1)%0Adatatest%3Ddatatest.na.fill(-1)%0A%0Aignore%20%3D%20%5B’outcome’%2C’activity_id’%2C’people_id’%5D%0Alista%20%3D%20%5Bx%20for%20x%20in%20data.columns%20if%20x%20not%20in%20ignore%5D%0Aschema%20%3D%20StructType(%5B%0AStructField(%22outcome%22%2C%20DoubleType()%2C%20True)%2C%0AStructField(%22features%22%2C%20ArrayType(StringType())%2C%20True)%5D)%0A%0Axtrain%20%3D%20sqlContext.createDataFrame(data.rdd.map(lambda%20l%3A%20(float(l%5B’outcome’%5D)%2C%20%5Bl%5Bx%5D%20for%20x%20in%20lista%5D))%2Cschema)%0Axtrain%3Dxtrain.select(‘features’%2C’outcome’)%0A%0Aignore%20%3D%20%5B%20’activity_id’%2C’people_id’%5D%0Alista%20%3D%20%5Bx%20for%20x%20in%20datatest.columns%20if%20x%20not%20in%20ignore%5D%0A%0Aschema%20%3D%20StructType(%5B%0A%20%20%20%20%20%20%20%20StructField(%22label%22%2C%20DoubleType()%2C%20True)%2C%0A%20%20%20%20%20%20%20%20StructField(%22features%22%2C%20ArrayType(StringType())%2C%20True)%5D)%0Aignore%20%3D%20%5B’activity_id’%5D%0Alista%3D%5Bx%20for%20x%20in%20datatest.columns%5D%0Axtest%20%3D%20sqlContext.createDataFrame(datatest.rdd.map(lambda%20l%3A%20(0.0%2C%5Bl%5Bx%5D%20for%20x%20in%20lista%5D))%20%2C%20schema)%0Axtest%3Dxtest.select(‘features’)%0Axtrain.cache()%0Axtest.cache()%0A%0AhashingTF%20%3D%20HashingTF(inputCol%3D%22features%22%2C%20outputCol%3D%22rawFeatures%22)%0AfeaturizedData%20%3D%20hashingTF.transform(xtrain)%0Aidf%3DIDF(inputCol%3D’rawFeatures’%2CoutputCol%3D’idfFeatures’)%0Amodelidf%3Didf.fit(featurizedData)%0Axx%3Dmodelidf.transform(featurizedData)%0Axxtr%20%3D%20xx.select(‘outcome’%2C%20’idfFeatures’)%0Axxtr.cache()%0A%0Afh2%20%3D%20hashingTF.transform(xtest)%0Axt%20%3D%20modelidf.transform(fh2)%0Axtt%20%3D%20xt.select(‘idfFeatures’)%0Axtt.cache()%0A%0A(trainingData%2C%20testData)%20%3D%20xxtr.randomSplit(%5B0.85%2C%200.15%5D%2C%20147)%0A%0Aclf%20%3D%20LogisticRegression(featuresCol%3D%22idfFeatures%22%2C%20labelCol%3D%22outcome%22%2C%20maxIter%3D1000%2C%20regParam%3D0.0%2C%20elasticNetParam%3D0.0)%0A%0A%0Amodel%20%3D%20clf.fit(trainingData)%0A%0A%23%20Make%20predictions.%0Apredictions%20%3D%20model.transform(testData)%0A%23%20Select%20example%20rows%20to%20display.%0Apredictions.select(%22probability%22%2C%20%22outcome%22).show(5)%0A%0A%23%20Select%20(prediction%2C%20true%20label)%20and%20compute%20test%20error%0Aevaluator%20%3D%20MulticlassClassificationEvaluator(%0A%20%20%20%20labelCol%3D%22outcome%22%2C%20predictionCol%3D%22prediction%22%2C%20metricName%3D%22accuracy%22)%0Aaccuracy%20%3D%20evaluator.evaluate(predictions)%0Aprint(%22Accuracy%20%3D%20%25g%22%20%25%20(accuracy))%0Apreds%3Dmodel.transform(xtt)%0Apreds.printSchema()%0A%0ApredsGBT%3Dpreds.select(%22probability%22).rdd.map(lambda%20r%3A%20r%5B0%5D%5B1%5D).collect()%0Apred_spark%5B’outcome’%5D%3DpredsGBT%0Apred_spark.to_csv(%22..%2Foutput%2Fpandas-spark.csv%22%2C%20index%3DFalse)%0A” message=”redhat-pandas-spark-hash.py” highlight=”45-51″ provider=”manual”/]

Note the highlighted lines from 45 to 51

Since I had many difficulties in understanding how to prepare the data in the format

"label","features"

when dealing with not numeric features, I want to show how I solved the problem preparing a new schema for the DataFrame and inserting in Array of StringType the features I selected in a list.

[pastacode lang=”python” manual=”ignore%20%3D%20%5B’outcome’%2C’activity_id’%2C’people_id’%5D%0Alista%20%3D%20%5Bx%20for%20x%20in%20data.columns%20if%20x%20not%20in%20ignore%5D%0Aschema%20%3D%20StructType(%5B%0AStructField(%22outcome%22%2C%20DoubleType()%2C%20True)%2C%0AStructField(%22features%22%2C%20ArrayType(StringType())%2C%20True)%5D)%0A%0Axtrain%20%3D%20sqlContext.createDataFrame(data.rdd.map(lambda%20l%3A%20(float(l%5B’outcome’%5D)%2C%20%5Bl%5Bx%5D%20for%20x%20in%20lista%5D))%2Cschema)” message=”” highlight=”” provider=”manual”/]

 

This is necessary before using the Hashing features extraction of ML spark lib.

So, this code achieved a score of ~0.95 on the Public Leadboard and is definitely not optimized. Maybe there is still room to reach better score.

Have fun

#spark #pyspark