Spark and XGBoost using Scala

Spark and XGBoost using Scala language

scala-xgboost-sparkRecently XGBoost project released a package on github where it is included interface to scala, java and spark (more info at this link).


I would like to run xgboost on a big set of data. Unfortunately the integration of XGBoost and PySpark is not yet released, so I was forced to do this integration in Scala Language.


In this post I just report the scala code lines which can be useful to run spark and xgboost.

In a further post I’m going to show the software setup and the integration of this project in Itellij IDEA community edition IDE.

Now, Let’s code!

Firstly we need to open a Spark Session

import java.util.Calendar
import org.apache.log4j.{Level, Logger}
import ml.dmlc.xgboost4j.scala.spark.XGBoost
import org.apache.spark.sql._
import org.apache.spark.sql.functions.lit

object SimpleXGBoost {

def main(args: Array[String]): Unit = {
val inputPath = "yourinputpath"

// create SparkSession
val spark = SparkSession
.appName("SimpleXGBoost Application")
.config("spark.executor.memory", "2G")
.config("spark.executor.cores", "4")
.config("spark.driver.memory", "1G")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.default.parallelism", "4")

We need to prepare the directory where we will write our results. The results will be saved in parquet format.

Therefore, I found the following solution to create automatically a new directory for the results

val now=Calendar.getInstance()
val currentHour = now.get(Calendar.HOUR_OF_DAY)
val currentMinute = now.get(Calendar.MINUTE)
val direct="./results/"+date+"-"+currentHour+"-"+currentMinute+"/"

The data I’m using as test are the one of the Kaggle Bosch competition.

So, we will read the data in Spark DataFrame format

///read data from disk
val dataset ="header", "true").option("inferSchema", true).csv(inputPath + "/input/train_numeric.csv")
val datatest ="header", "true").option("inferSchema", true).csv(inputPath + "/input/test_numeric.csv")


Now, we can fill the NaN and sample the train data, if we want

 //fill NA with 0 and subsample
val df =,0.7,10)
val df_test =

Finally, we are now ready to assemble the features

 //prepare data for ML
val header = df.columns.filter(!_.contains("Id")).filter(!_.contains("Response"))
val assembler = new VectorAssembler()

val train_DF0 = assembler.transform(df)
val test_DF0 = assembler.transform(df_test)
println("VectorAssembler Done!")

and prepare the train and test set for xgboost

 val train = train_DF0.withColumn("label", df("Response").cast("double")).select("label", "features")
    val test = test_DF0.withColumn("label", lit(1.0)).withColumnRenamed("Id","id").select("id", "label", "features")

    // Split the data into training and test sets (30% held out for testing).
    val Array(trainingData, testData) = train.randomSplit(Array(0.7, 0.3), seed = 0)

Now, we are ready to prepare the parameters map for xgboost and run it on our data sets:

// number of iterations
val numRound = 10
val numWorkers = 4
// training parameters
val paramMap = List(
      "eta" -> 0.023f,
      "max_depth" -> 10,
      "min_child_weight" -> 3.0,
      "subsample" -> 1.0,
      "colsample_bytree" -> 0.82,
      "colsample_bylevel" -> 0.9,
      "base_score" -> 0.005,
      "eval_metric" -> "auc",
      "seed" -> 49,
      "silent" -> 1,
      "objective" -> "binary:logistic").toMap
println("Starting Xgboost ")
val xgBoostModelWithDF = XGBoost.trainWithDataFrame(trainingData, paramMap,round = numRound, nWorkers = numWorkers, useExternalMemory = true)

val predictions = xgBoostModelWithDF.setExternalMemory(true).transform(testData).select("label", "probabilities")

We  store the prediction on validation set, since we will use them to tune the metric for this competition.

// DataFrames can be saved as Parquet files, maintaining the schema information"preds.parquet")

Finally, we are ready to prepare our prediction on test data set and save them on disk

 //prediction on test set for submission file
val submission = xgBoostModelWithDF.setExternalMemory(true).transform(test).select("id", "probabilities")"submission.parquet")


Later, offline, we can read the data from disk and convert to Pandas format (I used python script to do this)

comp ="preds.parquet").toPandas()

and prepare the submission file.

Good sparking!

Would love your thoughts, please comment.x
%d bloggers like this: