Data Sciencekaggle

PySpark first approaches. Tips and tricks

First approaches to Apache Spark and PySpark.

By participating in the recent competition Kaggle Bosch production line performance, I decided to try using Apache Spark and in particular PySpark.
When you need to analyze really big data , the use of Pandas, sometime, cannot fit the problems.
As I continue to consider Pandas a great tool for the characterization of the data, I realized that it’s time to move to something more sophisticated for large data file.
So, I downloaded the latest version of Spark, that is Spark 2.0.0, installed on my linux pc from source  and I began to play with it to get some confidence with its syntax.
The idea, of course, was  to utilize the ML library developed by the Spark Team.
I had several difficulties in the preparation of the features to the format required by the ML algorithms.
Hence, my idea is to share, gradually, the solution to small questions that I asked myself and for which I hardly found a solution on the net.

Start a Spark session

I started using Spark in standalone mode, not in cluster mode ( for the moment 🙂 ).

First of all I need to load a CSV file from disk in csv format.  In the documentation I read: As of Spark 2.0, the RDD-based APIs in the spark.mllib package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml package.

So, it will be easier for me to avoid entering in the Resilient Distributed Datasets (RDDs) world.

As first step we need to start a Spark session. We can do easily with the following lines, where we can specify some options configuration and the name of the App for example. Please, refer to the main Spark documentation website .

 

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

spark = SparkSession\
    .builder\
    .appName("example-spark")\
    .config("spark.sql.crossJoin.enabled","true")\
    .getOrCreate()
sc = SparkContext()
sqlContext = SQLContext(sc)

Load and analyze the data

Now we can load our training data set with this simple line

train = spark.read.csv("../input/train_numeric.csv", header="true",inferSchema="true")

 

 

And this was the first success for me. While on my I7 core with 16Gb RAM, I was not able to read the numeric train set of the Bosch Kaggle competition, in few seconds I had all the data in a DataFrame: Wow!

The ML classification routines used as input a specific format: you have to say to your algorithm what are the labels you want to predict and which are the Vector of features. This was the first obstacle. I was used to use a set of columns as features with Scikit-Learn and  now I have to find the way to assemble almost 1000 columns in one single one.

After reading a bit the documentation, the solution to handle and assemble  numeric features was easy:


from pyspark.ml.feature import VectorAssembler
ignore = ['Id', 'Response']
lista=[x for x in train.columns if x not in ignore]
 
assembler = VectorAssembler(
    inputCols=lista,
    outputCol='features')

train = (assembler.transform(train).select('Response',"features"))

Now I have the train data ready for a fast and simple classification model



from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
## Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = datatrain.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="Response", featuresCol="features", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData) Make predictions.
predictions = model.transform(testData)
predsGBT=predictions.select("prediction").rdd.map(lambda r: r[0]).collect()
preds=np.asarray(predsGBT).astype(int)

Prepare the submission file

Finally, following the simple steps also for test data set,  we can  prepare  the submission file

df_test = spark.read.csv("../input/test_numeric.csv", header="true",inferSchema="true")
df_test=df_test.na.fill(0)
data_test = (assembler.transform(df_test).select("features"))
# Make predictions.
preds = model.transform(data_test)
predsGBT=preds.select("prediction").rdd.map(lambda r: r[0]).collect()
sub = pd.read_csv('../input/sample_submission.csv')
sub['Response'] = np.asarray(predsGBT).astype(int)
sub.to_csv('../output/bosch-spark.csv', index=False)

As a result I easily obtained a score of 0.13

 

 

 

 

 

Leave a Reply

38 Comments on "PySpark first approaches. Tips and tricks"

Notify of
avatar
Sergio
Guest

Elena, aiuta.
cannot import name ‘SparkSession’

I’m using spark 2.0.2 on windows

Don
Guest

Thanks for the great post. I sure learned a lot after coming from the control engineering background :). Just to add my two cents, I think you don’t need to create SparkContext explicitly to use sqlContext here. The spark session inherits sqlContext and you can use it by SparkSession. Please read :
http://stackoverflow.com/questions/39521341/pyspark-error-attributeerror-sparksession-object-has-no-attribute-paralleli

Amit Sood
Guest

Hi,

Can you please help me with install spark. I have download spark from the link you have mentioned but its not running when i do the following

adminisatorsmbp:spark-2.0.1-bin-hadoop2.7 amit$ sudo ./sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /Users/amit/Documents/Analytics/kaggle/Bosch/spark-2.0.1-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-adminisatorsmbp.out
failed to launch org.apache.spark.deploy.master.Master:
at java.net.InetAddress.getLocalHost(InetAddress.java:1471)
… 10 more
full log in /Users/amit/Documents/Analytics/kaggle/Bosch/spark-2.0.1-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-adminisatorsmbp.out
adminisatorsmbp:spark-2.0.1-bin-hadoop2.7 amit$

Prashant
Guest

Stuck on trying to spark.read.csv. I am on windows
An error occurred while calling o29.csv. Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient