First approaches to Apache Spark and PySpark.
When you need to analyze really big data , the use of Pandas, sometime, cannot fit the problems.
The idea, of course, was to utilize the ML library developed by the Spark Team.
Start a Spark session
I started using Spark in standalone mode, not in cluster mode ( for the moment 🙂 ).
First of all I need to load a CSV file from disk in csv format. In the documentation I read: As of Spark 2.0, the RDD-based APIs in the spark.mllib
package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml
package.
So, it will be easier for me to avoid entering in the Resilient Distributed Datasets (RDDs) world.
As first step we need to start a Spark session. We can do easily with the following lines, where we can specify some options configuration and the name of the App for example. Please, refer to the main Spark documentation website .
[pastacode lang=”python” manual=”from%20pyspark.sql%20import%20SparkSession%0Afrom%20pyspark%20import%20SparkContext%0Afrom%20pyspark%20import%20SparkConf%0Afrom%20pyspark.sql%20import%20SQLContext%0A%0Aspark%20%3D%20SparkSession%5C%0A%20%20%20%20.builder%5C%0A%20%20%20%20.appName(%22example-spark%22)%5C%0A%20%20%20%20.config(%22spark.sql.crossJoin.enabled%22%2C%22true%22)%5C%0A%20%20%20%20.getOrCreate()%0Asc%20%3D%20SparkContext()%0AsqlContext%20%3D%20SQLContext(sc)” message=”” highlight=”” provider=”manual”/]
Load and analyze the data
Now we can load our training data set with this simple line
[pastacode lang=”python” manual=”train%20%3D%20spark.read.csv(%22..%2Finput%2Ftrain_numeric.csv%22%2C%20header%3D%22true%22%2CinferSchema%3D%22true%22)” message=”” highlight=”” provider=”manual”/]
And this was the first success for me. While on my I7 core with 16Gb RAM, I was not able to read the numeric train set of the Bosch Kaggle competition, in few seconds I had all the data in a DataFrame: Wow!
The ML classification routines used as input a specific format: you have to say to your algorithm what are the labels you want to predict and which are the Vector of features. This was the first obstacle. I was used to use a set of columns as features with Scikit-Learn and now I have to find the way to assemble almost 1000 columns in one single one.
After reading a bit the documentation, the solution to handle and assemble numeric features was easy:
[pastacode lang=”python” manual=”%0Afrom%20pyspark.ml.feature%20import%20VectorAssembler%0Aignore%20%3D%20%5B’Id’%2C%20’Response’%5D%0Alista%3D%5Bx%20for%20x%20in%20train.columns%20if%20x%20not%20in%20ignore%5D%0A%20%0Aassembler%20%3D%20VectorAssembler(%0A%20%20%20%20inputCols%3Dlista%2C%0A%20%20%20%20outputCol%3D’features’)%0A%0Atrain%20%3D%20(assembler.transform(train).select(‘Response’%2C%22features%22))%0A” message=”” highlight=”” provider=”manual”/]
Now I have the train data ready for a fast and simple classification model
[pastacode lang=”python” manual=”%0A%0Afrom%20pyspark.ml%20import%20Pipeline%0Afrom%20pyspark.ml.classification%20import%20GBTClassifier%0A%23%23%20Split%20the%20data%20into%20training%20and%20test%20sets%20(30%25%20held%20out%20for%20testing)%0A(trainingData%2C%20testData)%20%3D%20datatrain.randomSplit(%5B0.7%2C%200.3%5D)%0A%0A%23%20Train%20a%20GBT%20model.%0Agbt%20%3D%20GBTClassifier(labelCol%3D%22Response%22%2C%20featuresCol%3D%22features%22%2C%20maxIter%3D10)%0A%0A%23%20Chain%20indexers%20and%20GBT%20in%20a%20Pipeline%0Apipeline%20%3D%20Pipeline(stages%3D%5Bgbt%5D)%0A%0A%23%20Train%20model.%20%20This%20also%20runs%20the%20indexers.%0Amodel%20%3D%20pipeline.fit(trainingData)%20Make%20predictions.%0Apredictions%20%3D%20model.transform(testData)%0ApredsGBT%3Dpredictions.select(%22prediction%22).rdd.map(lambda%20r%3A%20r%5B0%5D).collect()%0Apreds%3Dnp.asarray(predsGBT).astype(int)%0A” message=”” highlight=”” provider=”manual”/]
Prepare the submission file
Finally, following the simple steps also for test data set, we can prepare the submission file
[pastacode lang=”python” manual=”df_test%20%3D%20spark.read.csv(%22..%2Finput%2Ftest_numeric.csv%22%2C%20header%3D%22true%22%2CinferSchema%3D%22true%22)%0Adf_test%3Ddf_test.na.fill(0)%0Adata_test%20%3D%20(assembler.transform(df_test).select(%22features%22))%0A%23%20Make%20predictions.%0Apreds%20%3D%20model.transform(data_test)%0ApredsGBT%3Dpreds.select(%22prediction%22).rdd.map(lambda%20r%3A%20r%5B0%5D).collect()%0Asub%20%3D%20pd.read_csv(‘..%2Finput%2Fsample_submission.csv’)%0Asub%5B’Response’%5D%20%3D%20np.asarray(predsGBT).astype(int)%0Asub.to_csv(‘..%2Foutput%2Fbosch-spark.csv’%2C%20index%3DFalse)%0A” message=”” highlight=”” provider=”manual”/]
As a result I easily obtained a score of 0.13…
Hi Elena:
I am new to Spark’s data frame just like you. I struggled to find a way to use VectorAssembler on columns that contain both numerical and categorical values that ensures method like Random Forest would accept as featuresCol. Have you had any luck with it? Thanks for posting!
Hi Jenny,
you can’t. VectorAssembler works only with numeric features. In the post pyspark for redhat competion, I showed how you can assemble string features and use hashing to transform them in numeric.
Hope this helps
e.
Hi Elena:
I figured out what I need to do when working with dataframe containing numeric and categorical/string values. I simply need to one-hot-encode the categorical columns one by one before using VectorAssembler. However, it’s inconvenient if you have many columns inside a large dataframe. I guess for the time being, I can use panda’s pd.get_dummies to format the data before running pyspark on it.
Thanks!
Jenny
Are you able to work with pandas and such a large data set?
Anyhow also in spark ml you can find the same features extraction algos.
Good luck!
Elena
Elena:
Just want to give you an update, I found a way to work with both numerical and categorical columns in spark ML from this link:
https://vanishingcodes.wordpress.com/2016/06/09/pyspark-tutorial-building-a-random-forest-binary-classifier-on-unbalanced-dataset/
Basically, you do one-hot-encode on all the categorical , then make a flat list, then use VectorAssembler on both numerical and the transformed categorical columns. I tried it and it worked.
Great Jenny!
Now I’m working more on scala for the integration of xgboost, but as soon as I have time, I plan to create a series of small post about spark’s world. Thanks for your update
Congrats on getting it running! whats your file act_train.csv? Is that already a concat file of input file
No, it was the numeric file. Sorry…I misprinted the line of code
it should be
train = spark.read.csv("../input/train_numeric.csv", header="true",inferSchema="true")
Thanx. I’ll correct it
Hello Elena,
your post is very interesting.
What do you mean with “standalone mode”? Do you mean you did not exploit a cluster? And if so, how can you benefit from using Spark on a single machine?
Thank you in advance.
Charlie
Hi Charlie,
I’m a newbie of Spark. My goal is to start using it and use it in a cluster mode.
For the moment I’m familiarizing with it.
I used the implementation standalone http://spark.apache.org/docs/latest/spark-standalone.html on a single pc (with 7 cores). I found that the structures Spark uses to store data in memory are much more efficient than Pandas DataFrame I’ve used until now for such a kind of problems. It was able to read all the data of the Bosch Kaggle competition, while it is impossible for Pandas.
What does the preds.select(“prediction”).rdd.map(lambda r: r[0]).collect() do?
Hi Huey,
it selects the vector ‘prediction’ of the output in the preds and map it to a vector to be used for pandas.
This is the solution I found to transform the results of the prediction in a series to be saved.
Consider I’m using spark 2.0.0 and they are removing the support for RDD to move to the DataFrame. So it was difficult for me to find many examples with DataFrame.
Maybe there is a clever way to do the same action.