CodeData Sciencekaggle

Spark and XGBoost using Scala

scala-xgboost-spark

Spark and XGBoost using Scala language

scala-xgboost-sparkRecently XGBoost project released a package on github where it is included interface to scala, java and spark (more info at this link).

I would like to run xgboost on a big set of data. Unfortunately the integration of XGBoost and PySpark is not yet released, so I was forced to do this integration in Scala Language.

 

In this post I just report the scala code lines which can be useful to run spark and xgboost.

In a further post I’m going to show the software setup and the integration of this project in Itellij IDEA community edition IDE.

Now, Let’s code!

Firstly we need to open a Spark Session



import java.util.Calendar
import org.apache.log4j.{Level, Logger}
import ml.dmlc.xgboost4j.scala.spark.XGBoost
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.sql.functions.lit

object SimpleXGBoost {
Logger.getLogger("org").setLevel(Level.WARN)

def main(args: Array[String]): Unit = {
val inputPath = "yourinputpath"

// create SparkSession
val spark = SparkSession
.builder()
.appName("SimpleXGBoost Application")
.config("spark.executor.memory", "2G")
.config("spark.executor.cores", "4")
.config("spark.driver.memory", "1G")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.default.parallelism", "4")
.master("local[*]")
.getOrCreate()

We need to prepare the directory where we will write our results. The results will be saved in parquet format.

Therefore, I found the following solution to create automatically a new directory for the results

val now=Calendar.getInstance()
val date=java.time.LocalDate.now
val currentHour = now.get(Calendar.HOUR_OF_DAY)
val currentMinute = now.get(Calendar.MINUTE)
val direct="./results/"+date+"-"+currentHour+"-"+currentMinute+"/"
println(direct)

The data I’m using as test are the one of the Kaggle Bosch competition.

So, we will read the data in Spark DataFrame format

///read data from disk
val dataset = spark.read.option("header", "true").option("inferSchema", true).csv(inputPath + "/input/train_numeric.csv")
val datatest = spark.read.option("header", "true").option("inferSchema", true).csv(inputPath + "/input/test_numeric.csv")

dataset.cache()
datatest.cache()

Now, we can fill the NaN and sample the train data, if we want

 //fill NA with 0 and subsample
val df = dataset.na.fill(0).sample(true,0.7,10)
val df_test = datatest.na.fill(0)

Finally, we are now ready to assemble the features

 //prepare data for ML
val header = df.columns.filter(!_.contains("Id")).filter(!_.contains("Response"))
val assembler = new VectorAssembler()
      .setInputCols(header)
      .setOutputCol("features")

val train_DF0 = assembler.transform(df)
val test_DF0 = assembler.transform(df_test)
println("VectorAssembler Done!")

and prepare the train and test set for xgboost

 val train = train_DF0.withColumn("label", df("Response").cast("double")).select("label", "features")
    val test = test_DF0.withColumn("label", lit(1.0)).withColumnRenamed("Id","id").select("id", "label", "features")

 
    // Split the data into training and test sets (30% held out for testing).
    val Array(trainingData, testData) = train.randomSplit(Array(0.7, 0.3), seed = 0)

Now, we are ready to prepare the parameters map for xgboost and run it on our data sets:

// number of iterations
val numRound = 10
val numWorkers = 4
// training parameters
val paramMap = List(
      "eta" -> 0.023f,
      "max_depth" -> 10,
      "min_child_weight" -> 3.0,
      "subsample" -> 1.0,
      "colsample_bytree" -> 0.82,
      "colsample_bylevel" -> 0.9,
      "base_score" -> 0.005,
      "eval_metric" -> "auc",
      "seed" -> 49,
      "silent" -> 1,
      "objective" -> "binary:logistic").toMap
println("Starting Xgboost ")
val xgBoostModelWithDF = XGBoost.trainWithDataFrame(trainingData, paramMap,round = numRound, nWorkers = numWorkers, useExternalMemory = true)

val predictions = xgBoostModelWithDF.setExternalMemory(true).transform(testData).select("label", "probabilities")

We  store the prediction on validation set, since we will use them to tune the metric for this competition.

// DataFrames can be saved as Parquet files, maintaining the schema information
predictions.write.save(direct+"preds.parquet")

Finally, we are ready to prepare our prediction on test data set and save them on disk

 //prediction on test set for submission file
val submission = xgBoostModelWithDF.setExternalMemory(true).transform(test).select("id", "probabilities")
submission.show(10)
submission.write.save(direct+"submission.parquet")
spark.stop()
  }
}

 

Later, offline, we can read the data from disk and convert to Pandas format (I used python script to do this)

comp =
spark.read.parquet(latest+"preds.parquet").toPandas()

and prepare the submission file.

Good sparking!

11
Leave a Reply

avatar
5 Comment threads
6 Thread replies
2 Followers
 
Most reacted comment
Hottest comment thread
6 Comment authors
DharmarajAnonymousSam JiaJon SorensonElena Cuoco Recent comment authors

This site uses Akismet to reduce spam. Learn how your comment data is processed.

  Subscribe  
Notify of
Dharmaraj
Guest
Dharmaraj

Hi Team,

I m a beginner to bigdata…can you please help me in this how to start and what i have to refer.
If possible please share me the Git-hubs and any links vedios etc.

Regards
Dharmaraj

Anonymous
Guest
Anonymous

great post, thanks.

Sam Jia
Guest
Sam Jia

At preparing the training data, I can make data like this:
|——–|————————-|
| label | features |
| 1.0 | [0.0,0.1, 29.0, 0.1] |
features are basically dense vectors. Do you think that will work?

Kruise
Guest
Kruise

Hi,

This is a great article. Have you got the blog for integration of this project in Itellij IDEA IDE ? I have been trying this code for few days without a luck to build a jar. It would be great if you could point me in a right direction to create a sbt build file. Thanks

Regards,
Kruise