Using PySpark for RedHat Kaggle competition
Redhat Kaggle competition is not so prohibitive from a computational point of view or data management.
The use of Pandas and xgboost, R allows you to get good scores. The idea is then to use Apache Spark only as an example of tutorials.
First of all, the merging of more data frame in PySpark is not as efficient as in pandas, and I don’t fully trust in the code I wrote to merge 2 different DataFrame in which the columns name are the same.
So a first implementation was to do firstly the merging of the data frame using the utilities of pandas and then analyse the data in spark, reading the merged csv file from disk.
This was the code:
"""
redhat-pandas-spark.py
@author Elena Cuoco
"""
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import atexit
import pandas as pd
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF
#### Init Spark
conf = SparkConf()
conf.set("spark.executor.memory", "4G")
conf.set("spark.driver.memory","18G")
conf.set("spark.executor.cores","7")
conf.set("spark.python.worker.memory","4G")
conf.set("spark.driver.maxResultSize","0")
conf.set("spark.sql.crossJoin.enabled","true")
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
conf.set("spark.default.parallelism","2")
conf.set("spark.sql.crossJoin.enabled", "true")
sc = SparkContext(conf=conf.setMaster('local[*]'))
sc.setLogLevel("WARN")
sqlContext = SQLContext(sc)
atexit.register(lambda: sc.stop())
spark = SparkSession \
.builder.config(conf=conf) \
.appName("spark-xgb").getOrCreate()
# spark is an existing SparkSession
data= spark.read.csv("../input/trainF.csv", header="true", inferSchema="true",mode="DROPMALFORMED")
datatest = spark.read.csv("../input/testF.csv", header="true", inferSchema="true",mode="DROPMALFORMED")
pred_spark = pd.DataFrame()
pred_spark['activity_id'] = datatest.select("activity_id").rdd.map(lambda r: r[0]).collect()
data=data.na.fill(-1)
datatest=datatest.na.fill(-1)
ignore = ['outcome','activity_id','people_id']
lista = [x for x in data.columns if x not in ignore]
schema = StructType([
StructField("outcome", DoubleType(), True),
StructField("features", ArrayType(StringType()), True)])
xtrain = sqlContext.createDataFrame(data.rdd.map(lambda l: (float(l['outcome']), [l[x] for x in lista])),schema)
xtrain=xtrain.select('features','outcome')
ignore = [ 'activity_id','people_id']
lista = [x for x in datatest.columns if x not in ignore]
schema = StructType([
StructField("label", DoubleType(), True),
StructField("features", ArrayType(StringType()), True)])
ignore = ['activity_id']
lista=[x for x in datatest.columns]
xtest = sqlContext.createDataFrame(datatest.rdd.map(lambda l: (0.0,[l[x] for x in lista])) , schema)
xtest=xtest.select('features')
xtrain.cache()
xtest.cache()
hashingTF = HashingTF(inputCol="features", outputCol="rawFeatures")
featurizedData = hashingTF.transform(xtrain)
idf=IDF(inputCol='rawFeatures',outputCol='idfFeatures')
modelidf=idf.fit(featurizedData)
xx=modelidf.transform(featurizedData)
xxtr = xx.select('outcome', 'idfFeatures')
xxtr.cache()
fh2 = hashingTF.transform(xtest)
xt = modelidf.transform(fh2)
xtt = xt.select('idfFeatures')
xtt.cache()
(trainingData, testData) = xxtr.randomSplit([0.85, 0.15], 147)
clf = LogisticRegression(featuresCol="idfFeatures", labelCol="outcome", maxIter=1000, regParam=0.0, elasticNetParam=0.0)
model = clf.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("probability", "outcome").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="outcome", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
preds=model.transform(xtt)
preds.printSchema()
predsGBT=preds.select("probability").rdd.map(lambda r: r[0][1]).collect()
pred_spark['outcome']=predsGBT
pred_spark.to_csv("../output/pandas-spark.csv", index=False)
redhat-pandas-spark-hash.py
Note the highlighted lines from 45 to 51
Since I had many difficulties in understanding how to prepare the data in the format
"label","features"
when dealing with not numeric features, I want to show how I solved the problem preparing a new schema for the DataFrame and inserting in Array
of StringType
the features I selected in a list.
ignore = ['outcome','activity_id','people_id']
lista = [x for x in data.columns if x not in ignore]
schema = StructType([
StructField("outcome", DoubleType(), True),
StructField("features", ArrayType(StringType()), True)])
xtrain = sqlContext.createDataFrame(data.rdd.map(lambda l: (float(l['outcome']), [l[x] for x in lista])),schema)
This is necessary before using the Hashing features extraction of ML spark lib.
So, this code achieved a score of ~0.95 on the Public Leadboard and is definitely not optimized. Maybe there is still room to reach better score.
Have fun
#spark #pyspark
Like this:
Like Loading...
Related