Data Sciencekaggle

PySpark for RedHat Kaggle competition

Using PySpark for RedHat Kaggle competition

Redhat Kaggle competition is not so prohibitive from a computational point of view or data management.

The use of Pandas and xgboost, R allows you to get good scores. The idea is then to use Apache Spark only as an example of tutorials.

First of all, the merging of more data frame in PySpark is not as efficient as in pandas, and I don’t fully trust in the code I wrote to merge 2 different DataFrame in which the columns name are the same.

So a first implementation was  to do firstly  the merging of the data frame using the utilities of pandas and then analyse the data in spark, reading the merged csv file  from disk.
This was the code:

 

"""
redhat-pandas-spark.py
@author Elena Cuoco
"""

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import atexit
import pandas as pd
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF

#### Init Spark
conf = SparkConf()
conf.set("spark.executor.memory", "4G")
conf.set("spark.driver.memory","18G")
conf.set("spark.executor.cores","7")
conf.set("spark.python.worker.memory","4G")
conf.set("spark.driver.maxResultSize","0")
conf.set("spark.sql.crossJoin.enabled","true")
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
conf.set("spark.default.parallelism","2")
conf.set("spark.sql.crossJoin.enabled", "true")
sc = SparkContext(conf=conf.setMaster('local[*]'))
sc.setLogLevel("WARN")
sqlContext = SQLContext(sc)
atexit.register(lambda: sc.stop())

spark = SparkSession \
    .builder.config(conf=conf) \
    .appName("spark-xgb").getOrCreate()
# spark is an existing SparkSession
data= spark.read.csv("../input/trainF.csv", header="true", inferSchema="true",mode="DROPMALFORMED")
datatest = spark.read.csv("../input/testF.csv", header="true", inferSchema="true",mode="DROPMALFORMED")

pred_spark = pd.DataFrame()
pred_spark['activity_id'] = datatest.select("activity_id").rdd.map(lambda r: r[0]).collect()
data=data.na.fill(-1)
datatest=datatest.na.fill(-1)

ignore = ['outcome','activity_id','people_id']
lista = [x for x in data.columns if x not in ignore]
schema = StructType([
StructField("outcome", DoubleType(), True),
StructField("features", ArrayType(StringType()), True)])

xtrain = sqlContext.createDataFrame(data.rdd.map(lambda l: (float(l['outcome']), [l[x] for x in lista])),schema)
xtrain=xtrain.select('features','outcome')

ignore = [ 'activity_id','people_id']
lista = [x for x in datatest.columns if x not in ignore]

schema = StructType([
        StructField("label", DoubleType(), True),
        StructField("features", ArrayType(StringType()), True)])
ignore = ['activity_id']
lista=[x for x in datatest.columns]
xtest = sqlContext.createDataFrame(datatest.rdd.map(lambda l: (0.0,[l[x] for x in lista])) , schema)
xtest=xtest.select('features')
xtrain.cache()
xtest.cache()

hashingTF = HashingTF(inputCol="features", outputCol="rawFeatures")
featurizedData = hashingTF.transform(xtrain)
idf=IDF(inputCol='rawFeatures',outputCol='idfFeatures')
modelidf=idf.fit(featurizedData)
xx=modelidf.transform(featurizedData)
xxtr = xx.select('outcome', 'idfFeatures')
xxtr.cache()

fh2 = hashingTF.transform(xtest)
xt = modelidf.transform(fh2)
xtt = xt.select('idfFeatures')
xtt.cache()

(trainingData, testData) = xxtr.randomSplit([0.85, 0.15], 147)

clf = LogisticRegression(featuresCol="idfFeatures", labelCol="outcome", maxIter=1000, regParam=0.0, elasticNetParam=0.0)


model = clf.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("probability", "outcome").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="outcome", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
preds=model.transform(xtt)
preds.printSchema()

predsGBT=preds.select("probability").rdd.map(lambda r: r[0][1]).collect()
pred_spark['outcome']=predsGBT
pred_spark.to_csv("../output/pandas-spark.csv", index=False)
redhat-pandas-spark-hash.py

Note the highlighted lines from 45 to 51

Since I had many difficulties in understanding how to prepare the data in the format

"label","features"

when dealing with not numeric features, I want to show how I solved the problem preparing a new schema for the DataFrame and inserting in Array of StringType the features I selected in a list.

ignore = ['outcome','activity_id','people_id']
lista = [x for x in data.columns if x not in ignore]
schema = StructType([
StructField("outcome", DoubleType(), True),
StructField("features", ArrayType(StringType()), True)])

xtrain = sqlContext.createDataFrame(data.rdd.map(lambda l: (float(l['outcome']), [l[x] for x in lista])),schema)

 

This is necessary before using the Hashing features extraction of ML spark lib.

So, this code achieved a score of ~0.95 on the Public Leadboard and is definitely not optimized. Maybe there is still room to reach better score.

Have fun

#spark #pyspark

 

Leave a Reply

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

  Subscribe  
Notify of