First approaches to Apache Spark and PySpark.

By participating in the recent competition Kaggle Bosch production line performance, I decided to try using Apache Spark and in particular PySpark.
When you need to analyze really big data , the use of Pandas, sometime, cannot fit the problems.
As I continue to consider Pandas a great tool for the characterization of the data, I realized that it’s time to move to something more sophisticated for large data file.
So, I downloaded the latest version of Spark, that is Spark 2.0.0, installed on my linux pc from source  and I began to play with it to get some confidence with its syntax.
The idea, of course, was  to utilize the ML library developed by the Spark Team.
I had several difficulties in the preparation of the features to the format required by the ML algorithms.
Hence, my idea is to share, gradually, the solution to small questions that I asked myself and for which I hardly found a solution on the net.

Start a Spark session

I started using Spark in standalone mode, not in cluster mode ( for the moment 🙂 ).

First of all I need to load a CSV file from disk in csv format.  In the documentation I read: As of Spark 2.0, the RDD-based APIs in the spark.mllib package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml package.

So, it will be easier for me to avoid entering in the Resilient Distributed Datasets (RDDs) world.

As first step we need to start a Spark session. We can do easily with the following lines, where we can specify some options configuration and the name of the App for example. Please, refer to the main Spark documentation website .

 

[pastacode lang=”python” manual=”from%20pyspark.sql%20import%20SparkSession%0Afrom%20pyspark%20import%20SparkContext%0Afrom%20pyspark%20import%20SparkConf%0Afrom%20pyspark.sql%20import%20SQLContext%0A%0Aspark%20%3D%20SparkSession%5C%0A%20%20%20%20.builder%5C%0A%20%20%20%20.appName(%22example-spark%22)%5C%0A%20%20%20%20.config(%22spark.sql.crossJoin.enabled%22%2C%22true%22)%5C%0A%20%20%20%20.getOrCreate()%0Asc%20%3D%20SparkContext()%0AsqlContext%20%3D%20SQLContext(sc)” message=”” highlight=”” provider=”manual”/]

Load and analyze the data

Now we can load our training data set with this simple line

[pastacode lang=”python” manual=”train%20%3D%20spark.read.csv(%22..%2Finput%2Ftrain_numeric.csv%22%2C%20header%3D%22true%22%2CinferSchema%3D%22true%22)” message=”” highlight=”” provider=”manual”/]

 

 

And this was the first success for me. While on my I7 core with 16Gb RAM, I was not able to read the numeric train set of the Bosch Kaggle competition, in few seconds I had all the data in a DataFrame: Wow!

The ML classification routines used as input a specific format: you have to say to your algorithm what are the labels you want to predict and which are the Vector of features. This was the first obstacle. I was used to use a set of columns as features with Scikit-Learn and  now I have to find the way to assemble almost 1000 columns in one single one.

After reading a bit the documentation, the solution to handle and assemble  numeric features was easy:

[pastacode lang=”python” manual=”%0Afrom%20pyspark.ml.feature%20import%20VectorAssembler%0Aignore%20%3D%20%5B’Id’%2C%20’Response’%5D%0Alista%3D%5Bx%20for%20x%20in%20train.columns%20if%20x%20not%20in%20ignore%5D%0A%20%0Aassembler%20%3D%20VectorAssembler(%0A%20%20%20%20inputCols%3Dlista%2C%0A%20%20%20%20outputCol%3D’features’)%0A%0Atrain%20%3D%20(assembler.transform(train).select(‘Response’%2C%22features%22))%0A” message=”” highlight=”” provider=”manual”/]

Now I have the train data ready for a fast and simple classification model

[pastacode lang=”python” manual=”%0A%0Afrom%20pyspark.ml%20import%20Pipeline%0Afrom%20pyspark.ml.classification%20import%20GBTClassifier%0A%23%23%20Split%20the%20data%20into%20training%20and%20test%20sets%20(30%25%20held%20out%20for%20testing)%0A(trainingData%2C%20testData)%20%3D%20datatrain.randomSplit(%5B0.7%2C%200.3%5D)%0A%0A%23%20Train%20a%20GBT%20model.%0Agbt%20%3D%20GBTClassifier(labelCol%3D%22Response%22%2C%20featuresCol%3D%22features%22%2C%20maxIter%3D10)%0A%0A%23%20Chain%20indexers%20and%20GBT%20in%20a%20Pipeline%0Apipeline%20%3D%20Pipeline(stages%3D%5Bgbt%5D)%0A%0A%23%20Train%20model.%20%20This%20also%20runs%20the%20indexers.%0Amodel%20%3D%20pipeline.fit(trainingData)%20Make%20predictions.%0Apredictions%20%3D%20model.transform(testData)%0ApredsGBT%3Dpredictions.select(%22prediction%22).rdd.map(lambda%20r%3A%20r%5B0%5D).collect()%0Apreds%3Dnp.asarray(predsGBT).astype(int)%0A” message=”” highlight=”” provider=”manual”/]

Prepare the submission file

Finally, following the simple steps also for test data set,  we can  prepare  the submission file

[pastacode lang=”python” manual=”df_test%20%3D%20spark.read.csv(%22..%2Finput%2Ftest_numeric.csv%22%2C%20header%3D%22true%22%2CinferSchema%3D%22true%22)%0Adf_test%3Ddf_test.na.fill(0)%0Adata_test%20%3D%20(assembler.transform(df_test).select(%22features%22))%0A%23%20Make%20predictions.%0Apreds%20%3D%20model.transform(data_test)%0ApredsGBT%3Dpreds.select(%22prediction%22).rdd.map(lambda%20r%3A%20r%5B0%5D).collect()%0Asub%20%3D%20pd.read_csv(‘..%2Finput%2Fsample_submission.csv’)%0Asub%5B’Response’%5D%20%3D%20np.asarray(predsGBT).astype(int)%0Asub.to_csv(‘..%2Foutput%2Fbosch-spark.csv’%2C%20index%3DFalse)%0A” message=”” highlight=”” provider=”manual”/]

As a result I easily obtained a score of 0.13