First approaches to Apache Spark and PySpark.
When you need to analyze really big data , the use of Pandas, sometime, cannot fit the problems.
The idea, of course, was to utilize the ML library developed by the Spark Team.
Start a Spark session
I started using Spark in standalone mode, not in cluster mode ( for the moment 🙂 ).
First of all I need to load a CSV file from disk in csv format. In the documentation I read: As of Spark 2.0, the RDD-based APIs in the spark.mllib
package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml
package.
So, it will be easier for me to avoid entering in the Resilient Distributed Datasets (RDDs) world.
As first step we need to start a Spark session. We can do easily with the following lines, where we can specify some options configuration and the name of the App for example. Please, refer to the main Spark documentation website .
[pastacode lang=”python” manual=”from%20pyspark.sql%20import%20SparkSession%0Afrom%20pyspark%20import%20SparkContext%0Afrom%20pyspark%20import%20SparkConf%0Afrom%20pyspark.sql%20import%20SQLContext%0A%0Aspark%20%3D%20SparkSession%5C%0A%20%20%20%20.builder%5C%0A%20%20%20%20.appName(%22example-spark%22)%5C%0A%20%20%20%20.config(%22spark.sql.crossJoin.enabled%22%2C%22true%22)%5C%0A%20%20%20%20.getOrCreate()%0Asc%20%3D%20SparkContext()%0AsqlContext%20%3D%20SQLContext(sc)” message=”” highlight=”” provider=”manual”/]
Load and analyze the data
Now we can load our training data set with this simple line
[pastacode lang=”python” manual=”train%20%3D%20spark.read.csv(%22..%2Finput%2Ftrain_numeric.csv%22%2C%20header%3D%22true%22%2CinferSchema%3D%22true%22)” message=”” highlight=”” provider=”manual”/]
And this was the first success for me. While on my I7 core with 16Gb RAM, I was not able to read the numeric train set of the Bosch Kaggle competition, in few seconds I had all the data in a DataFrame: Wow!
The ML classification routines used as input a specific format: you have to say to your algorithm what are the labels you want to predict and which are the Vector of features. This was the first obstacle. I was used to use a set of columns as features with Scikit-Learn and now I have to find the way to assemble almost 1000 columns in one single one.
After reading a bit the documentation, the solution to handle and assemble numeric features was easy:
[pastacode lang=”python” manual=”%0Afrom%20pyspark.ml.feature%20import%20VectorAssembler%0Aignore%20%3D%20%5B’Id’%2C%20’Response’%5D%0Alista%3D%5Bx%20for%20x%20in%20train.columns%20if%20x%20not%20in%20ignore%5D%0A%20%0Aassembler%20%3D%20VectorAssembler(%0A%20%20%20%20inputCols%3Dlista%2C%0A%20%20%20%20outputCol%3D’features’)%0A%0Atrain%20%3D%20(assembler.transform(train).select(‘Response’%2C%22features%22))%0A” message=”” highlight=”” provider=”manual”/]
Now I have the train data ready for a fast and simple classification model
[pastacode lang=”python” manual=”%0A%0Afrom%20pyspark.ml%20import%20Pipeline%0Afrom%20pyspark.ml.classification%20import%20GBTClassifier%0A%23%23%20Split%20the%20data%20into%20training%20and%20test%20sets%20(30%25%20held%20out%20for%20testing)%0A(trainingData%2C%20testData)%20%3D%20datatrain.randomSplit(%5B0.7%2C%200.3%5D)%0A%0A%23%20Train%20a%20GBT%20model.%0Agbt%20%3D%20GBTClassifier(labelCol%3D%22Response%22%2C%20featuresCol%3D%22features%22%2C%20maxIter%3D10)%0A%0A%23%20Chain%20indexers%20and%20GBT%20in%20a%20Pipeline%0Apipeline%20%3D%20Pipeline(stages%3D%5Bgbt%5D)%0A%0A%23%20Train%20model.%20%20This%20also%20runs%20the%20indexers.%0Amodel%20%3D%20pipeline.fit(trainingData)%20Make%20predictions.%0Apredictions%20%3D%20model.transform(testData)%0ApredsGBT%3Dpredictions.select(%22prediction%22).rdd.map(lambda%20r%3A%20r%5B0%5D).collect()%0Apreds%3Dnp.asarray(predsGBT).astype(int)%0A” message=”” highlight=”” provider=”manual”/]
Prepare the submission file
Finally, following the simple steps also for test data set, we can prepare the submission file
[pastacode lang=”python” manual=”df_test%20%3D%20spark.read.csv(%22..%2Finput%2Ftest_numeric.csv%22%2C%20header%3D%22true%22%2CinferSchema%3D%22true%22)%0Adf_test%3Ddf_test.na.fill(0)%0Adata_test%20%3D%20(assembler.transform(df_test).select(%22features%22))%0A%23%20Make%20predictions.%0Apreds%20%3D%20model.transform(data_test)%0ApredsGBT%3Dpreds.select(%22prediction%22).rdd.map(lambda%20r%3A%20r%5B0%5D).collect()%0Asub%20%3D%20pd.read_csv(‘..%2Finput%2Fsample_submission.csv’)%0Asub%5B’Response’%5D%20%3D%20np.asarray(predsGBT).astype(int)%0Asub.to_csv(‘..%2Foutput%2Fbosch-spark.csv’%2C%20index%3DFalse)%0A” message=”” highlight=”” provider=”manual”/]
As a result I easily obtained a score of 0.13…
Hi Elena:
Did you write up the code in Jupyter notebook or Canopy? Thanks.
Jenny
Hi Jenny,
I used Jupyter, but that’s make no difference.
cheers
e.
Elena, have you tried running this on an Amazon EMR instance? I get errors when I try to use Pandas to read the sample-submission file.
Hi Huey, I didn’t try running on Amazon. It sounds strange you can have problems with pandas…maybe it is only a matter of data location.
At the moment I’m still trying to understand better spark and trying to use the recent xgboost for spark
Hey Elena, the issue has something to do with Pandas using boto to access files on s3. If the sample-submission.csv is on the master node, then it’s fine.
Although, it seems like we shouldn’t have to use Pandas just to read and write a CSV, correct?
Yes. I used spark to read data.
To write the results I used pandas, but you can do in different way, I think also using spark.saveAsTextFile(outputDirectory), but I didn’t try it.
I believe these lines are not necessary since you already created a SparkSession.
sc = SparkContext()
sqlContext = SQLContext(sc)
If you create a SparkSession you dont need to create SparkContext, It’s already present due to spark session in spark 2.0