Using PySpark for RedHat Kaggle competition
Redhat Kaggle competition is not so prohibitive from a computational point of view or data management.
The use of Pandas and xgboost, R allows you to get good scores. The idea is then to use Apache Spark only as an example of tutorials.
First of all, the merging of more data frame in PySpark is not as efficient as in pandas, and I don’t fully trust in the code I wrote to merge 2 different DataFrame in which the columns name are the same.
So a first implementation was to do firstly the merging of the data frame using the utilities of pandas and then analyse the data in spark, reading the merged csv file from disk.
This was the code:
[pastacode lang=”python” manual=”%22%22%22%0Aredhat-pandas-spark.py%0A%40author%20Elena%20Cuoco%0A%22%22%22%0A%0Afrom%20pyspark%20import%20SparkConf%0Afrom%20pyspark%20import%20SparkContext%0Afrom%20pyspark.sql%20import%20SQLContext%0Afrom%20pyspark.sql%20import%20SparkSession%0Afrom%20pyspark.sql.types%20import%20*%0Aimport%20atexit%0Aimport%20pandas%20as%20pd%0Afrom%20pyspark.ml.classification%20import%20LogisticRegression%0Afrom%20pyspark.ml.evaluation%20import%20MulticlassClassificationEvaluator%0Afrom%20pyspark.ml.feature%20import%20HashingTF%2C%20IDF%0A%0A%23%23%23%23%20Init%20Spark%0Aconf%20%3D%20SparkConf()%0Aconf.set(%22spark.executor.memory%22%2C%20%224G%22)%0Aconf.set(%22spark.driver.memory%22%2C%2218G%22)%0Aconf.set(%22spark.executor.cores%22%2C%227%22)%0Aconf.set(%22spark.python.worker.memory%22%2C%224G%22)%0Aconf.set(%22spark.driver.maxResultSize%22%2C%220%22)%0Aconf.set(%22spark.sql.crossJoin.enabled%22%2C%22true%22)%0Aconf.set(%22spark.serializer%22%2C%22org.apache.spark.serializer.KryoSerializer%22)%0Aconf.set(%22spark.default.parallelism%22%2C%222%22)%0Aconf.set(%22spark.sql.crossJoin.enabled%22%2C%20%22true%22)%0Asc%20%3D%20SparkContext(conf%3Dconf.setMaster(‘local%5B*%5D’))%0Asc.setLogLevel(%22WARN%22)%0AsqlContext%20%3D%20SQLContext(sc)%0Aatexit.register(lambda%3A%20sc.stop())%0A%0Aspark%20%3D%20SparkSession%20%5C%0A%20%20%20%20.builder.config(conf%3Dconf)%20%5C%0A%20%20%20%20.appName(%22spark-xgb%22).getOrCreate()%0A%23%20spark%20is%20an%20existing%20SparkSession%0Adata%3D%20spark.read.csv(%22..%2Finput%2FtrainF.csv%22%2C%20header%3D%22true%22%2C%20inferSchema%3D%22true%22%2Cmode%3D%22DROPMALFORMED%22)%0Adatatest%20%3D%20spark.read.csv(%22..%2Finput%2FtestF.csv%22%2C%20header%3D%22true%22%2C%20inferSchema%3D%22true%22%2Cmode%3D%22DROPMALFORMED%22)%0A%0Apred_spark%20%3D%20pd.DataFrame()%0Apred_spark%5B’activity_id’%5D%20%3D%20datatest.select(%22activity_id%22).rdd.map(lambda%20r%3A%20r%5B0%5D).collect()%0Adata%3Ddata.na.fill(-1)%0Adatatest%3Ddatatest.na.fill(-1)%0A%0Aignore%20%3D%20%5B’outcome’%2C’activity_id’%2C’people_id’%5D%0Alista%20%3D%20%5Bx%20for%20x%20in%20data.columns%20if%20x%20not%20in%20ignore%5D%0Aschema%20%3D%20StructType(%5B%0AStructField(%22outcome%22%2C%20DoubleType()%2C%20True)%2C%0AStructField(%22features%22%2C%20ArrayType(StringType())%2C%20True)%5D)%0A%0Axtrain%20%3D%20sqlContext.createDataFrame(data.rdd.map(lambda%20l%3A%20(float(l%5B’outcome’%5D)%2C%20%5Bl%5Bx%5D%20for%20x%20in%20lista%5D))%2Cschema)%0Axtrain%3Dxtrain.select(‘features’%2C’outcome’)%0A%0Aignore%20%3D%20%5B%20’activity_id’%2C’people_id’%5D%0Alista%20%3D%20%5Bx%20for%20x%20in%20datatest.columns%20if%20x%20not%20in%20ignore%5D%0A%0Aschema%20%3D%20StructType(%5B%0A%20%20%20%20%20%20%20%20StructField(%22label%22%2C%20DoubleType()%2C%20True)%2C%0A%20%20%20%20%20%20%20%20StructField(%22features%22%2C%20ArrayType(StringType())%2C%20True)%5D)%0Aignore%20%3D%20%5B’activity_id’%5D%0Alista%3D%5Bx%20for%20x%20in%20datatest.columns%5D%0Axtest%20%3D%20sqlContext.createDataFrame(datatest.rdd.map(lambda%20l%3A%20(0.0%2C%5Bl%5Bx%5D%20for%20x%20in%20lista%5D))%20%2C%20schema)%0Axtest%3Dxtest.select(‘features’)%0Axtrain.cache()%0Axtest.cache()%0A%0AhashingTF%20%3D%20HashingTF(inputCol%3D%22features%22%2C%20outputCol%3D%22rawFeatures%22)%0AfeaturizedData%20%3D%20hashingTF.transform(xtrain)%0Aidf%3DIDF(inputCol%3D’rawFeatures’%2CoutputCol%3D’idfFeatures’)%0Amodelidf%3Didf.fit(featurizedData)%0Axx%3Dmodelidf.transform(featurizedData)%0Axxtr%20%3D%20xx.select(‘outcome’%2C%20’idfFeatures’)%0Axxtr.cache()%0A%0Afh2%20%3D%20hashingTF.transform(xtest)%0Axt%20%3D%20modelidf.transform(fh2)%0Axtt%20%3D%20xt.select(‘idfFeatures’)%0Axtt.cache()%0A%0A(trainingData%2C%20testData)%20%3D%20xxtr.randomSplit(%5B0.85%2C%200.15%5D%2C%20147)%0A%0Aclf%20%3D%20LogisticRegression(featuresCol%3D%22idfFeatures%22%2C%20labelCol%3D%22outcome%22%2C%20maxIter%3D1000%2C%20regParam%3D0.0%2C%20elasticNetParam%3D0.0)%0A%0A%0Amodel%20%3D%20clf.fit(trainingData)%0A%0A%23%20Make%20predictions.%0Apredictions%20%3D%20model.transform(testData)%0A%23%20Select%20example%20rows%20to%20display.%0Apredictions.select(%22probability%22%2C%20%22outcome%22).show(5)%0A%0A%23%20Select%20(prediction%2C%20true%20label)%20and%20compute%20test%20error%0Aevaluator%20%3D%20MulticlassClassificationEvaluator(%0A%20%20%20%20labelCol%3D%22outcome%22%2C%20predictionCol%3D%22prediction%22%2C%20metricName%3D%22accuracy%22)%0Aaccuracy%20%3D%20evaluator.evaluate(predictions)%0Aprint(%22Accuracy%20%3D%20%25g%22%20%25%20(accuracy))%0Apreds%3Dmodel.transform(xtt)%0Apreds.printSchema()%0A%0ApredsGBT%3Dpreds.select(%22probability%22).rdd.map(lambda%20r%3A%20r%5B0%5D%5B1%5D).collect()%0Apred_spark%5B’outcome’%5D%3DpredsGBT%0Apred_spark.to_csv(%22..%2Foutput%2Fpandas-spark.csv%22%2C%20index%3DFalse)%0A” message=”redhat-pandas-spark-hash.py” highlight=”45-51″ provider=”manual”/]
Note the highlighted lines from 45 to 51
Since I had many difficulties in understanding how to prepare the data in the format
"label","features"
when dealing with not numeric features, I want to show how I solved the problem preparing a new schema for the DataFrame and inserting in Array
of StringType
the features I selected in a list.
[pastacode lang=”python” manual=”ignore%20%3D%20%5B’outcome’%2C’activity_id’%2C’people_id’%5D%0Alista%20%3D%20%5Bx%20for%20x%20in%20data.columns%20if%20x%20not%20in%20ignore%5D%0Aschema%20%3D%20StructType(%5B%0AStructField(%22outcome%22%2C%20DoubleType()%2C%20True)%2C%0AStructField(%22features%22%2C%20ArrayType(StringType())%2C%20True)%5D)%0A%0Axtrain%20%3D%20sqlContext.createDataFrame(data.rdd.map(lambda%20l%3A%20(float(l%5B’outcome’%5D)%2C%20%5Bl%5Bx%5D%20for%20x%20in%20lista%5D))%2Cschema)” message=”” highlight=”” provider=”manual”/]
This is necessary before using the Hashing features extraction of ML spark lib.
So, this code achieved a score of ~0.95 on the Public Leadboard and is definitely not optimized. Maybe there is still room to reach better score.
Have fun
#spark #pyspark