scala-xgboost-spark

Spark and XGBoost using Scala language

scala-xgboost-sparkRecently XGBoost project released a package on github where it is included interface to scala, java and spark (more info at this link).

I would like to run xgboost on a big set of data. Unfortunately the integration of XGBoost and PySpark is not yet released, so I was forced to do this integration in Scala Language.

 

In this post I just report the scala code lines which can be useful to run spark and xgboost.

In a further post I’m going to show the software setup and the integration of this project in Itellij IDEA community edition IDE.

Now, Let’s code!

Firstly we need to open a Spark Session

[pastacode lang=”java” manual=”%0A%0Aimport%20java.util.Calendar%0Aimport%20org.apache.log4j.%7BLevel%2C%20Logger%7D%0Aimport%20ml.dmlc.xgboost4j.scala.spark.XGBoost%0Aimport%20org.apache.spark.ml.feature._%0Aimport%20org.apache.spark.sql._%0Aimport%20org.apache.spark.sql.functions.lit%0A%0Aobject%20SimpleXGBoost%20%7B%0ALogger.getLogger(%22org%22).setLevel(Level.WARN)%0A%0Adef%20main(args%3A%20Array%5BString%5D)%3A%20Unit%20%3D%20%7B%0Aval%20inputPath%20%3D%20%22yourinputpath%22%0A%0A%2F%2F%20create%20SparkSession%0Aval%20spark%20%3D%20SparkSession%0A.builder()%0A.appName(%22SimpleXGBoost%20Application%22)%0A.config(%22spark.executor.memory%22%2C%20%222G%22)%0A.config(%22spark.executor.cores%22%2C%20%224%22)%0A.config(%22spark.driver.memory%22%2C%20%221G%22)%0A.config(%22spark.serializer%22%2C%20%22org.apache.spark.serializer.KryoSerializer%22)%0A.config(%22spark.default.parallelism%22%2C%20%224%22)%0A.master(%22local%5B*%5D%22)%0A.getOrCreate()” message=”” highlight=”” provider=”manual”/]

We need to prepare the directory where we will write our results. The results will be saved in parquet format.

Therefore, I found the following solution to create automatically a new directory for the results

[pastacode lang=”java” manual=”val%20now%3DCalendar.getInstance()%0Aval%20date%3Djava.time.LocalDate.now%0Aval%20currentHour%20%3D%20now.get(Calendar.HOUR_OF_DAY)%0Aval%20currentMinute%20%3D%20now.get(Calendar.MINUTE)%0Aval%20direct%3D%22.%2Fresults%2F%22%2Bdate%2B%22-%22%2BcurrentHour%2B%22-%22%2BcurrentMinute%2B%22%2F%22%0Aprintln(direct)” message=”” highlight=”” provider=”manual”/]

The data I’m using as test are the one of the Kaggle Bosch competition.

So, we will read the data in Spark DataFrame format

[pastacode lang=”java” manual=”%2F%2F%2Fread%20data%20from%20disk%0Aval%20dataset%20%3D%20spark.read.option(%22header%22%2C%20%22true%22).option(%22inferSchema%22%2C%20true).csv(inputPath%20%2B%20%22%2Finput%2Ftrain_numeric.csv%22)%0Aval%20datatest%20%3D%20spark.read.option(%22header%22%2C%20%22true%22).option(%22inferSchema%22%2C%20true).csv(inputPath%20%2B%20%22%2Finput%2Ftest_numeric.csv%22)%0A%0Adataset.cache()%0Adatatest.cache()” message=”” highlight=”” provider=”manual”/]

Now, we can fill the NaN and sample the train data, if we want

[pastacode lang=”java” manual=”%20%2F%2Ffill%20NA%20with%200%20and%20subsample%0Aval%20df%20%3D%20dataset.na.fill(0).sample(true%2C0.7%2C10)%0Aval%20df_test%20%3D%20datatest.na.fill(0)” message=”” highlight=”” provider=”manual”/]

Finally, we are now ready to assemble the features

[pastacode lang=”java” manual=”%20%2F%2Fprepare%20data%20for%20ML%0Aval%20header%20%3D%20df.columns.filter(!_.contains(%22Id%22)).filter(!_.contains(%22Response%22))%0Aval%20assembler%20%3D%20new%20VectorAssembler()%0A%20%20%20%20%20%20.setInputCols(header)%0A%20%20%20%20%20%20.setOutputCol(%22features%22)%0A%0Aval%20train_DF0%20%3D%20assembler.transform(df)%0Aval%20test_DF0%20%3D%20assembler.transform(df_test)%0Aprintln(%22VectorAssembler%20Done!%22)” message=”” highlight=”” provider=”manual”/]

and prepare the train and test set for xgboost

[pastacode lang=”java” manual=”%20val%20train%20%3D%20train_DF0.withColumn(%22label%22%2C%20df(%22Response%22).cast(%22double%22)).select(%22label%22%2C%20%22features%22)%0A%20%20%20%20val%20test%20%3D%20test_DF0.withColumn(%22label%22%2C%20lit(1.0)).withColumnRenamed(%22Id%22%2C%22id%22).select(%22id%22%2C%20%22label%22%2C%20%22features%22)%0A%0A%20%0A%20%20%20%20%2F%2F%20Split%20the%20data%20into%20training%20and%20test%20sets%20(30%25%20held%20out%20for%20testing).%0A%20%20%20%20val%20Array(trainingData%2C%20testData)%20%3D%20train.randomSplit(Array(0.7%2C%200.3)%2C%20seed%20%3D%200)” message=”” highlight=”” provider=”manual”/]

Now, we are ready to prepare the parameters map for xgboost and run it on our data sets:

[pastacode lang=”java” manual=”%2F%2F%20number%20of%20iterations%0Aval%20numRound%20%3D%2010%0Aval%20numWorkers%20%3D%204%0A%2F%2F%20training%20parameters%0Aval%20paramMap%20%3D%20List(%0A%20%20%20%20%20%20%22eta%22%20-%3E%200.023f%2C%0A%20%20%20%20%20%20%22max_depth%22%20-%3E%2010%2C%0A%20%20%20%20%20%20%22min_child_weight%22%20-%3E%203.0%2C%0A%20%20%20%20%20%20%22subsample%22%20-%3E%201.0%2C%0A%20%20%20%20%20%20%22colsample_bytree%22%20-%3E%200.82%2C%0A%20%20%20%20%20%20%22colsample_bylevel%22%20-%3E%200.9%2C%0A%20%20%20%20%20%20%22base_score%22%20-%3E%200.005%2C%0A%20%20%20%20%20%20%22eval_metric%22%20-%3E%20%22auc%22%2C%0A%20%20%20%20%20%20%22seed%22%20-%3E%2049%2C%0A%20%20%20%20%20%20%22silent%22%20-%3E%201%2C%0A%20%20%20%20%20%20%22objective%22%20-%3E%20%22binary%3Alogistic%22).toMap%0Aprintln(%22Starting%20Xgboost%20%22)%0Aval%20xgBoostModelWithDF%20%3D%20XGBoost.trainWithDataFrame(trainingData%2C%20paramMap%2Cround%20%3D%20numRound%2C%20nWorkers%20%3D%20numWorkers%2C%20useExternalMemory%20%3D%20true)%0A%0Aval%20predictions%20%3D%20xgBoostModelWithDF.setExternalMemory(true).transform(testData).select(%22label%22%2C%20%22probabilities%22)%0A” message=”” highlight=”” provider=”manual”/]

We  store the prediction on validation set, since we will use them to tune the metric for this competition.

[pastacode lang=”java” manual=”%2F%2F%20DataFrames%20can%20be%20saved%20as%20Parquet%20files%2C%20maintaining%20the%20schema%20information%0Apredictions.write.save(direct%2B%22preds.parquet%22)” message=”” highlight=”” provider=”manual”/]

Finally, we are ready to prepare our prediction on test data set and save them on disk

[pastacode lang=”java” manual=”%20%2F%2Fprediction%20on%20test%20set%20for%20submission%20file%0Aval%20submission%20%3D%20xgBoostModelWithDF.setExternalMemory(true).transform(test).select(%22id%22%2C%20%22probabilities%22)%0Asubmission.show(10)%0Asubmission.write.save(direct%2B%22submission.parquet%22)%0Aspark.stop()%0A%20%20%7D%0A%7D” message=”” highlight=”” provider=”manual”/]

 

Later, offline, we can read the data from disk and convert to Pandas format (I used python script to do this)

[pastacode lang=”python” manual=”comp%20%3D%0Aspark.read.parquet(latest%2B%22preds.parquet%22).toPandas()” message=”” highlight=”” provider=”manual”/]

and prepare the submission file.

Good sparking!