Spark and XGBoost using Scala language
Recently XGBoost project released a package on github where it is included interface to scala, java and spark (more info at this link).
I would like to run xgboost on a big set of data. Unfortunately the integration of XGBoost and PySpark is not yet released, so I was forced to do this integration in Scala Language.
In this post I just report the scala code lines which can be useful to run spark and xgboost.
In a further post I’m going to show the software setup and the integration of this project in Itellij IDEA community edition IDE.
Now, Let’s code!
Firstly we need to open a Spark Session
[pastacode lang=”java” manual=”%0A%0Aimport%20java.util.Calendar%0Aimport%20org.apache.log4j.%7BLevel%2C%20Logger%7D%0Aimport%20ml.dmlc.xgboost4j.scala.spark.XGBoost%0Aimport%20org.apache.spark.ml.feature._%0Aimport%20org.apache.spark.sql._%0Aimport%20org.apache.spark.sql.functions.lit%0A%0Aobject%20SimpleXGBoost%20%7B%0ALogger.getLogger(%22org%22).setLevel(Level.WARN)%0A%0Adef%20main(args%3A%20Array%5BString%5D)%3A%20Unit%20%3D%20%7B%0Aval%20inputPath%20%3D%20%22yourinputpath%22%0A%0A%2F%2F%20create%20SparkSession%0Aval%20spark%20%3D%20SparkSession%0A.builder()%0A.appName(%22SimpleXGBoost%20Application%22)%0A.config(%22spark.executor.memory%22%2C%20%222G%22)%0A.config(%22spark.executor.cores%22%2C%20%224%22)%0A.config(%22spark.driver.memory%22%2C%20%221G%22)%0A.config(%22spark.serializer%22%2C%20%22org.apache.spark.serializer.KryoSerializer%22)%0A.config(%22spark.default.parallelism%22%2C%20%224%22)%0A.master(%22local%5B*%5D%22)%0A.getOrCreate()” message=”” highlight=”” provider=”manual”/]
We need to prepare the directory where we will write our results. The results will be saved in parquet format.
Therefore, I found the following solution to create automatically a new directory for the results
[pastacode lang=”java” manual=”val%20now%3DCalendar.getInstance()%0Aval%20date%3Djava.time.LocalDate.now%0Aval%20currentHour%20%3D%20now.get(Calendar.HOUR_OF_DAY)%0Aval%20currentMinute%20%3D%20now.get(Calendar.MINUTE)%0Aval%20direct%3D%22.%2Fresults%2F%22%2Bdate%2B%22-%22%2BcurrentHour%2B%22-%22%2BcurrentMinute%2B%22%2F%22%0Aprintln(direct)” message=”” highlight=”” provider=”manual”/]
The data I’m using as test are the one of the Kaggle Bosch competition.
So, we will read the data in Spark DataFrame format
[pastacode lang=”java” manual=”%2F%2F%2Fread%20data%20from%20disk%0Aval%20dataset%20%3D%20spark.read.option(%22header%22%2C%20%22true%22).option(%22inferSchema%22%2C%20true).csv(inputPath%20%2B%20%22%2Finput%2Ftrain_numeric.csv%22)%0Aval%20datatest%20%3D%20spark.read.option(%22header%22%2C%20%22true%22).option(%22inferSchema%22%2C%20true).csv(inputPath%20%2B%20%22%2Finput%2Ftest_numeric.csv%22)%0A%0Adataset.cache()%0Adatatest.cache()” message=”” highlight=”” provider=”manual”/]
Now, we can fill the NaN and sample the train data, if we want
[pastacode lang=”java” manual=”%20%2F%2Ffill%20NA%20with%200%20and%20subsample%0Aval%20df%20%3D%20dataset.na.fill(0).sample(true%2C0.7%2C10)%0Aval%20df_test%20%3D%20datatest.na.fill(0)” message=”” highlight=”” provider=”manual”/]
Finally, we are now ready to assemble the features
[pastacode lang=”java” manual=”%20%2F%2Fprepare%20data%20for%20ML%0Aval%20header%20%3D%20df.columns.filter(!_.contains(%22Id%22)).filter(!_.contains(%22Response%22))%0Aval%20assembler%20%3D%20new%20VectorAssembler()%0A%20%20%20%20%20%20.setInputCols(header)%0A%20%20%20%20%20%20.setOutputCol(%22features%22)%0A%0Aval%20train_DF0%20%3D%20assembler.transform(df)%0Aval%20test_DF0%20%3D%20assembler.transform(df_test)%0Aprintln(%22VectorAssembler%20Done!%22)” message=”” highlight=”” provider=”manual”/]
and prepare the train and test set for xgboost
[pastacode lang=”java” manual=”%20val%20train%20%3D%20train_DF0.withColumn(%22label%22%2C%20df(%22Response%22).cast(%22double%22)).select(%22label%22%2C%20%22features%22)%0A%20%20%20%20val%20test%20%3D%20test_DF0.withColumn(%22label%22%2C%20lit(1.0)).withColumnRenamed(%22Id%22%2C%22id%22).select(%22id%22%2C%20%22label%22%2C%20%22features%22)%0A%0A%20%0A%20%20%20%20%2F%2F%20Split%20the%20data%20into%20training%20and%20test%20sets%20(30%25%20held%20out%20for%20testing).%0A%20%20%20%20val%20Array(trainingData%2C%20testData)%20%3D%20train.randomSplit(Array(0.7%2C%200.3)%2C%20seed%20%3D%200)” message=”” highlight=”” provider=”manual”/]
Now, we are ready to prepare the parameters map for xgboost and run it on our data sets:
[pastacode lang=”java” manual=”%2F%2F%20number%20of%20iterations%0Aval%20numRound%20%3D%2010%0Aval%20numWorkers%20%3D%204%0A%2F%2F%20training%20parameters%0Aval%20paramMap%20%3D%20List(%0A%20%20%20%20%20%20%22eta%22%20-%3E%200.023f%2C%0A%20%20%20%20%20%20%22max_depth%22%20-%3E%2010%2C%0A%20%20%20%20%20%20%22min_child_weight%22%20-%3E%203.0%2C%0A%20%20%20%20%20%20%22subsample%22%20-%3E%201.0%2C%0A%20%20%20%20%20%20%22colsample_bytree%22%20-%3E%200.82%2C%0A%20%20%20%20%20%20%22colsample_bylevel%22%20-%3E%200.9%2C%0A%20%20%20%20%20%20%22base_score%22%20-%3E%200.005%2C%0A%20%20%20%20%20%20%22eval_metric%22%20-%3E%20%22auc%22%2C%0A%20%20%20%20%20%20%22seed%22%20-%3E%2049%2C%0A%20%20%20%20%20%20%22silent%22%20-%3E%201%2C%0A%20%20%20%20%20%20%22objective%22%20-%3E%20%22binary%3Alogistic%22).toMap%0Aprintln(%22Starting%20Xgboost%20%22)%0Aval%20xgBoostModelWithDF%20%3D%20XGBoost.trainWithDataFrame(trainingData%2C%20paramMap%2Cround%20%3D%20numRound%2C%20nWorkers%20%3D%20numWorkers%2C%20useExternalMemory%20%3D%20true)%0A%0Aval%20predictions%20%3D%20xgBoostModelWithDF.setExternalMemory(true).transform(testData).select(%22label%22%2C%20%22probabilities%22)%0A” message=”” highlight=”” provider=”manual”/]
We store the prediction on validation set, since we will use them to tune the metric for this competition.
[pastacode lang=”java” manual=”%2F%2F%20DataFrames%20can%20be%20saved%20as%20Parquet%20files%2C%20maintaining%20the%20schema%20information%0Apredictions.write.save(direct%2B%22preds.parquet%22)” message=”” highlight=”” provider=”manual”/]
Finally, we are ready to prepare our prediction on test data set and save them on disk
[pastacode lang=”java” manual=”%20%2F%2Fprediction%20on%20test%20set%20for%20submission%20file%0Aval%20submission%20%3D%20xgBoostModelWithDF.setExternalMemory(true).transform(test).select(%22id%22%2C%20%22probabilities%22)%0Asubmission.show(10)%0Asubmission.write.save(direct%2B%22submission.parquet%22)%0Aspark.stop()%0A%20%20%7D%0A%7D” message=”” highlight=”” provider=”manual”/]
Later, offline, we can read the data from disk and convert to Pandas format (I used python script to do this)
[pastacode lang=”python” manual=”comp%20%3D%0Aspark.read.parquet(latest%2B%22preds.parquet%22).toPandas()” message=”” highlight=”” provider=”manual”/]
and prepare the submission file.
Good sparking!
Hi Team,
I m a beginner to bigdata…can you please help me in this how to start and what i have to refer.
If possible please share me the Git-hubs and any links vedios etc.
Regards
Dharmaraj
great post, thanks.
At preparing the training data, I can make data like this:
|——–|————————-|
| label | features |
| 1.0 | [0.0,0.1, 29.0, 0.1] |
features are basically dense vectors. Do you think that will work?
it should work
So I used the above format. But it throws exceptions like below:
Exception in thread “main” ml.dmlc.xgboost4j.java.XGBoostError: XGBoostModel training failed
at ml.dmlc.xgboost4j.scala.spark.XGBoost$.postTrackerReturnProcessing(XGBoost.scala:363)
at ml.dmlc.xgboost4j.scala.spark.XGBoost$.trainDistributed(XGBoost.scala:334)
at ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator.train(XGBoostEstimator.scala:139)
at ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator.train(XGBoostEstimator.scala:36)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
at ml.dmlc.xgboost4j.scala.spark.XGBoost$.trainWithDataFrame(XGBoost.scala:191)
have you ever had problems like this?
try to assemble the features with VectorAssembler. It’s a while I did not update the code, maybe there are changes in the version of spark and xgboost
Hi,
This is a great article. Have you got the blog for integration of this project in Itellij IDEA IDE ? I have been trying this code for few days without a luck to build a jar. It would be great if you could point me in a right direction to create a sbt build file. Thanks
Regards,
Kruise
Hi,
thanks! This is the link point to the blog for integration in IntelliJ
Scala project under IntelliJ IDEA
I just happened to have found this great blog post today as I was about to dip into xgboost and scala. In addition to your instructions at the Scala project link I found I needed to do the following to compile and run your code examples: Install XGBoost4J using the instructions here. I installed to my local repository using mvn install -DskipTests -DskipTests because various tests were failing but I could tell the builds were working add the following to your SBT file resolvers += Resolver.mavenLocal libraryDependencies += "ml.dmlc" % "xgboost4j" % "0.7" libraryDependencies += "ml.dmlc" % "xgboost4j-spark" % "0.7"… Read more »