Data Science - kaggle

PySpark first approaches. Tips and tricks

First approaches to Apache Spark and PySpark.

By participating in the recent competition Kaggle Bosch production line performance, I decided to try using Apache Spark and in particular PySpark.
When you need to analyze really big data , the use of Pandas, sometime, cannot fit the problems.
As I continue to consider Pandas a great tool for the characterization of the data, I realized that it’s time to move to something more sophisticated for large data file.
So, I downloaded the latest version of Spark, that is Spark 2.0.0, installed on my linux pc from source  and I began to play with it to get some confidence with its syntax.
The idea, of course, was  to utilize the ML library developed by the Spark Team.
I had several difficulties in the preparation of the features to the format required by the ML algorithms.
Hence, my idea is to share, gradually, the solution to small questions that I asked myself and for which I hardly found a solution on the net.

Start a Spark session

I started using Spark in standalone mode, not in cluster mode ( for the moment 🙂 ).

First of all I need to load a CSV file from disk in csv format.  In the documentation I read: As of Spark 2.0, the RDD-based APIs in the spark.mllib package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml package.

So, it will be easier for me to avoid entering in the Resilient Distributed Datasets (RDDs) world.

As first step we need to start a Spark session. We can do easily with the following lines, where we can specify some options configuration and the name of the App for example. Please, refer to the main Spark documentation website .

 

[pastacode lang=”python” manual=”from%20pyspark.sql%20import%20SparkSession%0Afrom%20pyspark%20import%20SparkContext%0Afrom%20pyspark%20import%20SparkConf%0Afrom%20pyspark.sql%20import%20SQLContext%0A%0Aspark%20%3D%20SparkSession%5C%0A%20%20%20%20.builder%5C%0A%20%20%20%20.appName(%22example-spark%22)%5C%0A%20%20%20%20.config(%22spark.sql.crossJoin.enabled%22%2C%22true%22)%5C%0A%20%20%20%20.getOrCreate()%0Asc%20%3D%20SparkContext()%0AsqlContext%20%3D%20SQLContext(sc)” message=”” highlight=”” provider=”manual”/]

Load and analyze the data

Now we can load our training data set with this simple line

[pastacode lang=”python” manual=”train%20%3D%20spark.read.csv(%22..%2Finput%2Ftrain_numeric.csv%22%2C%20header%3D%22true%22%2CinferSchema%3D%22true%22)” message=”” highlight=”” provider=”manual”/]

 

 

And this was the first success for me. While on my I7 core with 16Gb RAM, I was not able to read the numeric train set of the Bosch Kaggle competition, in few seconds I had all the data in a DataFrame: Wow!

The ML classification routines used as input a specific format: you have to say to your algorithm what are the labels you want to predict and which are the Vector of features. This was the first obstacle. I was used to use a set of columns as features with Scikit-Learn and  now I have to find the way to assemble almost 1000 columns in one single one.

After reading a bit the documentation, the solution to handle and assemble  numeric features was easy:

[pastacode lang=”python” manual=”%0Afrom%20pyspark.ml.feature%20import%20VectorAssembler%0Aignore%20%3D%20%5B’Id’%2C%20’Response’%5D%0Alista%3D%5Bx%20for%20x%20in%20train.columns%20if%20x%20not%20in%20ignore%5D%0A%20%0Aassembler%20%3D%20VectorAssembler(%0A%20%20%20%20inputCols%3Dlista%2C%0A%20%20%20%20outputCol%3D’features’)%0A%0Atrain%20%3D%20(assembler.transform(train).select(‘Response’%2C%22features%22))%0A” message=”” highlight=”” provider=”manual”/]

Now I have the train data ready for a fast and simple classification model

[pastacode lang=”python” manual=”%0A%0Afrom%20pyspark.ml%20import%20Pipeline%0Afrom%20pyspark.ml.classification%20import%20GBTClassifier%0A%23%23%20Split%20the%20data%20into%20training%20and%20test%20sets%20(30%25%20held%20out%20for%20testing)%0A(trainingData%2C%20testData)%20%3D%20datatrain.randomSplit(%5B0.7%2C%200.3%5D)%0A%0A%23%20Train%20a%20GBT%20model.%0Agbt%20%3D%20GBTClassifier(labelCol%3D%22Response%22%2C%20featuresCol%3D%22features%22%2C%20maxIter%3D10)%0A%0A%23%20Chain%20indexers%20and%20GBT%20in%20a%20Pipeline%0Apipeline%20%3D%20Pipeline(stages%3D%5Bgbt%5D)%0A%0A%23%20Train%20model.%20%20This%20also%20runs%20the%20indexers.%0Amodel%20%3D%20pipeline.fit(trainingData)%20Make%20predictions.%0Apredictions%20%3D%20model.transform(testData)%0ApredsGBT%3Dpredictions.select(%22prediction%22).rdd.map(lambda%20r%3A%20r%5B0%5D).collect()%0Apreds%3Dnp.asarray(predsGBT).astype(int)%0A” message=”” highlight=”” provider=”manual”/]

Prepare the submission file

Finally, following the simple steps also for test data set,  we can  prepare  the submission file

[pastacode lang=”python” manual=”df_test%20%3D%20spark.read.csv(%22..%2Finput%2Ftest_numeric.csv%22%2C%20header%3D%22true%22%2CinferSchema%3D%22true%22)%0Adf_test%3Ddf_test.na.fill(0)%0Adata_test%20%3D%20(assembler.transform(df_test).select(%22features%22))%0A%23%20Make%20predictions.%0Apreds%20%3D%20model.transform(data_test)%0ApredsGBT%3Dpreds.select(%22prediction%22).rdd.map(lambda%20r%3A%20r%5B0%5D).collect()%0Asub%20%3D%20pd.read_csv(‘..%2Finput%2Fsample_submission.csv’)%0Asub%5B’Response’%5D%20%3D%20np.asarray(predsGBT).astype(int)%0Asub.to_csv(‘..%2Foutput%2Fbosch-spark.csv’%2C%20index%3DFalse)%0A” message=”” highlight=”” provider=”manual”/]

As a result I easily obtained a score of 0.13

 

 

 

 

 

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

38 Comments
Inline Feedbacks
View all comments
Sergio
Sergio
April 5, 2017 10:00 am

Elena, aiuta.
cannot import name ‘SparkSession’

I’m using spark 2.0.2 on windows

Don
Don
December 25, 2016 9:40 pm

Thanks for the great post. I sure learned a lot after coming from the control engineering background :). Just to add my two cents, I think you don’t need to create SparkContext explicitly to use sqlContext here. The spark session inherits sqlContext and you can use it by SparkSession. Please read :
http://stackoverflow.com/questions/39521341/pyspark-error-attributeerror-sparksession-object-has-no-attribute-paralleli

Amit Sood
Amit Sood
October 24, 2016 12:14 pm

Hi,

Can you please help me with install spark. I have download spark from the link you have mentioned but its not running when i do the following

adminisatorsmbp:spark-2.0.1-bin-hadoop2.7 amit$ sudo ./sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /Users/amit/Documents/Analytics/kaggle/Bosch/spark-2.0.1-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-adminisatorsmbp.out
failed to launch org.apache.spark.deploy.master.Master:
at java.net.InetAddress.getLocalHost(InetAddress.java:1471)
… 10 more
full log in /Users/amit/Documents/Analytics/kaggle/Bosch/spark-2.0.1-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-adminisatorsmbp.out
adminisatorsmbp:spark-2.0.1-bin-hadoop2.7 amit$

Prashant
Prashant
September 28, 2016 4:01 pm

Stuck on trying to spark.read.csv. I am on windows
An error occurred while calling o29.csv. Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient