First approaches to Apache Spark and PySpark.
When you need to analyze really big data , the use of Pandas, sometime, cannot fit the problems.
The idea, of course, was to utilize the ML library developed by the Spark Team.
Start a Spark session
I started using Spark in standalone mode, not in cluster mode ( for the moment 🙂 ).
First of all I need to load a CSV file from disk in csv format. In the documentation I read: As of Spark 2.0, the RDD-based APIs in the spark.mllib
package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml
package.
So, it will be easier for me to avoid entering in the Resilient Distributed Datasets (RDDs) world.
As first step we need to start a Spark session. We can do easily with the following lines, where we can specify some options configuration and the name of the App for example. Please, refer to the main Spark documentation website .
[pastacode lang=”python” manual=”from%20pyspark.sql%20import%20SparkSession%0Afrom%20pyspark%20import%20SparkContext%0Afrom%20pyspark%20import%20SparkConf%0Afrom%20pyspark.sql%20import%20SQLContext%0A%0Aspark%20%3D%20SparkSession%5C%0A%20%20%20%20.builder%5C%0A%20%20%20%20.appName(%22example-spark%22)%5C%0A%20%20%20%20.config(%22spark.sql.crossJoin.enabled%22%2C%22true%22)%5C%0A%20%20%20%20.getOrCreate()%0Asc%20%3D%20SparkContext()%0AsqlContext%20%3D%20SQLContext(sc)” message=”” highlight=”” provider=”manual”/]
Load and analyze the data
Now we can load our training data set with this simple line
[pastacode lang=”python” manual=”train%20%3D%20spark.read.csv(%22..%2Finput%2Ftrain_numeric.csv%22%2C%20header%3D%22true%22%2CinferSchema%3D%22true%22)” message=”” highlight=”” provider=”manual”/]
And this was the first success for me. While on my I7 core with 16Gb RAM, I was not able to read the numeric train set of the Bosch Kaggle competition, in few seconds I had all the data in a DataFrame: Wow!
The ML classification routines used as input a specific format: you have to say to your algorithm what are the labels you want to predict and which are the Vector of features. This was the first obstacle. I was used to use a set of columns as features with Scikit-Learn and now I have to find the way to assemble almost 1000 columns in one single one.
After reading a bit the documentation, the solution to handle and assemble numeric features was easy:
[pastacode lang=”python” manual=”%0Afrom%20pyspark.ml.feature%20import%20VectorAssembler%0Aignore%20%3D%20%5B’Id’%2C%20’Response’%5D%0Alista%3D%5Bx%20for%20x%20in%20train.columns%20if%20x%20not%20in%20ignore%5D%0A%20%0Aassembler%20%3D%20VectorAssembler(%0A%20%20%20%20inputCols%3Dlista%2C%0A%20%20%20%20outputCol%3D’features’)%0A%0Atrain%20%3D%20(assembler.transform(train).select(‘Response’%2C%22features%22))%0A” message=”” highlight=”” provider=”manual”/]
Now I have the train data ready for a fast and simple classification model
[pastacode lang=”python” manual=”%0A%0Afrom%20pyspark.ml%20import%20Pipeline%0Afrom%20pyspark.ml.classification%20import%20GBTClassifier%0A%23%23%20Split%20the%20data%20into%20training%20and%20test%20sets%20(30%25%20held%20out%20for%20testing)%0A(trainingData%2C%20testData)%20%3D%20datatrain.randomSplit(%5B0.7%2C%200.3%5D)%0A%0A%23%20Train%20a%20GBT%20model.%0Agbt%20%3D%20GBTClassifier(labelCol%3D%22Response%22%2C%20featuresCol%3D%22features%22%2C%20maxIter%3D10)%0A%0A%23%20Chain%20indexers%20and%20GBT%20in%20a%20Pipeline%0Apipeline%20%3D%20Pipeline(stages%3D%5Bgbt%5D)%0A%0A%23%20Train%20model.%20%20This%20also%20runs%20the%20indexers.%0Amodel%20%3D%20pipeline.fit(trainingData)%20Make%20predictions.%0Apredictions%20%3D%20model.transform(testData)%0ApredsGBT%3Dpredictions.select(%22prediction%22).rdd.map(lambda%20r%3A%20r%5B0%5D).collect()%0Apreds%3Dnp.asarray(predsGBT).astype(int)%0A” message=”” highlight=”” provider=”manual”/]
Prepare the submission file
Finally, following the simple steps also for test data set, we can prepare the submission file
[pastacode lang=”python” manual=”df_test%20%3D%20spark.read.csv(%22..%2Finput%2Ftest_numeric.csv%22%2C%20header%3D%22true%22%2CinferSchema%3D%22true%22)%0Adf_test%3Ddf_test.na.fill(0)%0Adata_test%20%3D%20(assembler.transform(df_test).select(%22features%22))%0A%23%20Make%20predictions.%0Apreds%20%3D%20model.transform(data_test)%0ApredsGBT%3Dpreds.select(%22prediction%22).rdd.map(lambda%20r%3A%20r%5B0%5D).collect()%0Asub%20%3D%20pd.read_csv(‘..%2Finput%2Fsample_submission.csv’)%0Asub%5B’Response’%5D%20%3D%20np.asarray(predsGBT).astype(int)%0Asub.to_csv(‘..%2Foutput%2Fbosch-spark.csv’%2C%20index%3DFalse)%0A” message=”” highlight=”” provider=”manual”/]
As a result I easily obtained a score of 0.13…
Elena, aiuta.
cannot import name ‘SparkSession’
I’m using spark 2.0.2 on windows
Ciao Sergio,
non sono un utente windows. sempre lavorato su linux, non so quanto io possa aiutarti.
Da qualche parte devi indicare dove può trovare il python package.
In linux funziona così:
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/libpy4j-0.10.4-src.zip:$
PYTHONPATH
Thanks for the great post. I sure learned a lot after coming from the control engineering background :). Just to add my two cents, I think you don’t need to create SparkContext explicitly to use sqlContext here. The spark session inherits sqlContext and you can use it by SparkSession. Please read :
http://stackoverflow.com/questions/39521341/pyspark-error-attributeerror-sparksession-object-has-no-attribute-paralleli
Hi,
Can you please help me with install spark. I have download spark from the link you have mentioned but its not running when i do the following
adminisatorsmbp:spark-2.0.1-bin-hadoop2.7 amit$ sudo ./sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /Users/amit/Documents/Analytics/kaggle/Bosch/spark-2.0.1-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-adminisatorsmbp.out
failed to launch org.apache.spark.deploy.master.Master:
at java.net.InetAddress.getLocalHost(InetAddress.java:1471)
… 10 more
full log in /Users/amit/Documents/Analytics/kaggle/Bosch/spark-2.0.1-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-adminisatorsmbp.out
adminisatorsmbp:spark-2.0.1-bin-hadoop2.7 amit$
Why are you using the version with hadoop? Are you running on a cluster?
Stuck on trying to spark.read.csv. I am on windows
An error occurred while calling o29.csv. Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
Sorry, Cannot help…I never worked under Windows. hope someone else read your comment.
e.