CodeData Sciencekaggle

Spark and XGBoost using Scala

scala-xgboost-spark

Spark and XGBoost using Scala language

scala-xgboost-sparkRecently XGBoost project released a package on github where it is included interface to scala, java and spark (more info at this link).

I would like to run xgboost on a big set of data. Unfortunately the integration of XGBoost and PySpark is not yet released, so I was forced to do this integration in Scala Language.

 

In this post I just report the scala code lines which can be useful to run spark and xgboost.

In a further post I’m going to show the software setup and the integration of this project in Itellij IDEA community edition IDE.

Now, Let’s code!

Firstly we need to open a Spark Session



import java.util.Calendar
import org.apache.log4j.{Level, Logger}
import ml.dmlc.xgboost4j.scala.spark.XGBoost
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.sql.functions.lit

object SimpleXGBoost {
Logger.getLogger("org").setLevel(Level.WARN)

def main(args: Array[String]): Unit = {
val inputPath = "yourinputpath"

// create SparkSession
val spark = SparkSession
.builder()
.appName("SimpleXGBoost Application")
.config("spark.executor.memory", "2G")
.config("spark.executor.cores", "4")
.config("spark.driver.memory", "1G")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.default.parallelism", "4")
.master("local[*]")
.getOrCreate()

We need to prepare the directory where we will write our results. The results will be saved in parquet format.

Therefore, I found the following solution to create automatically a new directory for the results

val now=Calendar.getInstance()
val date=java.time.LocalDate.now
val currentHour = now.get(Calendar.HOUR_OF_DAY)
val currentMinute = now.get(Calendar.MINUTE)
val direct="./results/"+date+"-"+currentHour+"-"+currentMinute+"/"
println(direct)

The data I’m using as test are the one of the Kaggle Bosch competition.

So, we will read the data in Spark DataFrame format

///read data from disk
val dataset = spark.read.option("header", "true").option("inferSchema", true).csv(inputPath + "/input/train_numeric.csv")
val datatest = spark.read.option("header", "true").option("inferSchema", true).csv(inputPath + "/input/test_numeric.csv")

dataset.cache()
datatest.cache()

Now, we can fill the NaN and sample the train data, if we want

 //fill NA with 0 and subsample
val df = dataset.na.fill(0).sample(true,0.7,10)
val df_test = datatest.na.fill(0)

Finally, we are now ready to assemble the features

 //prepare data for ML
val header = df.columns.filter(!_.contains("Id")).filter(!_.contains("Response"))
val assembler = new VectorAssembler()
      .setInputCols(header)
      .setOutputCol("features")

val train_DF0 = assembler.transform(df)
val test_DF0 = assembler.transform(df_test)
println("VectorAssembler Done!")

and prepare the train and test set for xgboost

 val train = train_DF0.withColumn("label", df("Response").cast("double")).select("label", "features")
    val test = test_DF0.withColumn("label", lit(1.0)).withColumnRenamed("Id","id").select("id", "label", "features")

 
    // Split the data into training and test sets (30% held out for testing).
    val Array(trainingData, testData) = train.randomSplit(Array(0.7, 0.3), seed = 0)

Now, we are ready to prepare the parameters map for xgboost and run it on our data sets:

// number of iterations
val numRound = 10
val numWorkers = 4
// training parameters
val paramMap = List(
      "eta" -> 0.023f,
      "max_depth" -> 10,
      "min_child_weight" -> 3.0,
      "subsample" -> 1.0,
      "colsample_bytree" -> 0.82,
      "colsample_bylevel" -> 0.9,
      "base_score" -> 0.005,
      "eval_metric" -> "auc",
      "seed" -> 49,
      "silent" -> 1,
      "objective" -> "binary:logistic").toMap
println("Starting Xgboost ")
val xgBoostModelWithDF = XGBoost.trainWithDataFrame(trainingData, paramMap,round = numRound, nWorkers = numWorkers, useExternalMemory = true)

val predictions = xgBoostModelWithDF.setExternalMemory(true).transform(testData).select("label", "probabilities")

We  store the prediction on validation set, since we will use them to tune the metric for this competition.

// DataFrames can be saved as Parquet files, maintaining the schema information
predictions.write.save(direct+"preds.parquet")

Finally, we are ready to prepare our prediction on test data set and save them on disk

 //prediction on test set for submission file
val submission = xgBoostModelWithDF.setExternalMemory(true).transform(test).select("id", "probabilities")
submission.show(10)
submission.write.save(direct+"submission.parquet")
spark.stop()
  }
}

 

Later, offline, we can read the data from disk and convert to Pandas format (I used python script to do this)

comp =
spark.read.parquet(latest+"preds.parquet").toPandas()

and prepare the submission file.

Good sparking!

12
Leave a Reply

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

  Subscribe  
Notify of
Dharmaraj
Guest
Dharmaraj

Hi Team,

I m a beginner to bigdata…can you please help me in this how to start and what i have to refer.
If possible please share me the Git-hubs and any links vedios etc.

Regards
Dharmaraj

Anonymous
Guest
Anonymous

great post, thanks.

Sam Jia
Guest
Sam Jia

At preparing the training data, I can make data like this:
|——–|————————-|
| label | features |
| 1.0 | [0.0,0.1, 29.0, 0.1] |
features are basically dense vectors. Do you think that will work?

Kruise
Guest
Kruise

Hi,

This is a great article. Have you got the blog for integration of this project in Itellij IDEA IDE ? I have been trying this code for few days without a luck to build a jar. It would be great if you could point me in a right direction to create a sbt build file. Thanks

Regards,
Kruise