Running Apache Spark in 3 Easy Steps

Apache Spark is a fast and general-purpose (mostly in-memory) cluster computing system. It provides convenient APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs and fault tolerance through lineage tracking.

For the beginner running a local version of Spark might seem a daunting task. There are many installation steps required for a production deployment of Apache Spark. In this post I show a method for installing and experimenting with Spark in 3 easy commands on Ubuntu 14.04. So here they are starting from a bash prompt:

1) Install Java

 sudo apt-add-repository ppa:webupd8team/java && sudo apt-get update && sudo apt-get install oracle-java7-installer 

2) Download Spark and extract

wget http://d3kbcqa49mib13.cloudfront.net/spark-1.2.0-bin-hadoop2.4.tgz && tar -xvf spark-1.2.0-bin-hadoop2.4.tgz

3) Start the pyspark shell/repl:

spark-1.2.0-bin-hadoop2.4/bin/pyspark

This should be enough to get a fully functioning shell where to experiment with the Spark Python API:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.2.0
      /_/

Using Python version 2.7.6 (default, Mar 22 2014 22:59:56)
SparkContext available as sc.

Trying a couple of commands should work. Here we create an RDD from the Readme.txt file and count the number of lines.

textFile = sc.textFile("spark-1.2.0-bin-hadoop2.4/README.md")
15/01/31 10:16:02 INFO MemoryStore: ensureFreeSpace(163705) called with curMem=0, maxMem=280248975
15/01/31 10:16:02 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 159.9 KB, free 267.1 MB)
15/01/31 10:16:02 INFO MemoryStore: ensureFreeSpace(22692) called with curMem=163705, maxMem=280248975
15/01/31 10:16:02 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 22.2 KB, free 267.1 MB)
15/01/31 10:16:02 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40146 (size: 22.2 KB, free: 267.2 MB)
15/01/31 10:16:02 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/01/31 10:16:02 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2
textFile.count()
15/01/31 10:16:11 INFO FileInputFormat: Total input paths to process : 1
15/01/31 10:16:12 INFO SparkContext: Starting job: count at <stdin>:1
15/01/31 10:16:12 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 1 output partitions (allowLocal=false)
15/01/31 10:16:12 INFO DAGScheduler: Final stage: Stage 0(count at <stdin>:1)
15/01/31 10:16:12 INFO DAGScheduler: Parents of final stage: List()
15/01/31 10:16:12 INFO DAGScheduler: Missing parents: List()
15/01/31 10:16:12 INFO DAGScheduler: Submitting Stage 0 (PythonRDD[2] at count at <stdin>:1), which has no missing parents
15/01/31 10:16:12 INFO MemoryStore: ensureFreeSpace(5464) called with curMem=186397, maxMem=280248975
15/01/31 10:16:12 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.3 KB, free 267.1 MB)
15/01/31 10:16:12 INFO MemoryStore: ensureFreeSpace(4003) called with curMem=191861, maxMem=280248975
15/01/31 10:16:12 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.9 KB, free 267.1 MB)
15/01/31 10:16:12 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:40146 (size: 3.9 KB, free: 267.2 MB)
15/01/31 10:16:12 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/01/31 10:16:12 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838
15/01/31 10:16:12 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/01/31 10:16:12 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1317 bytes)
15/01/31 10:16:12 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/01/31 10:16:12 INFO HadoopRDD: Input split: file:/home/ubuntu/spark-1.2.0-bin-hadoop2.4/README.md:0+3645
15/01/31 10:16:12 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/01/31 10:16:12 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/01/31 10:16:12 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/01/31 10:16:12 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/01/31 10:16:12 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/01/31 10:16:12 INFO PythonRDD: Times: total = 700, boot = 603, init = 95, finish = 2
15/01/31 10:16:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1797 bytes result sent to driver
15/01/31 10:16:13 INFO DAGScheduler: Stage 0 (count at <stdin>:1) finished in 0.859 s
15/01/31 10:16:13 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took 0.973213 s
15/01/31 10:16:13 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 843 ms on localhost (1/1)
15/01/31 10:16:13 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
98

The pyspark shell also starts up a local webserver on http://localhost:4040, where we can look at the current jobs submitted to the local spark engine. We can see the count() action call, the date it was submitted, its status, the number of tasks it took and other details. We will be exploring more of those options in future posts.

 Capture-01_02_15

This was an intro on how to install and experiment with Apache Spark in 3 easy commands!

How to plot a ROC Curve in Scikit learn?

The ROC curve stands for Receiver Operating Characteristic curve, and is used to visualize the performance of a classifier. When evaluating a new model performance, accuracy can be very sensitive to unbalanced class proportions.  The ROC curve is insensitive to this lack of balance in the data set.

On the other hand when using precision and recall, we are using a single discrimination threshold to compute the confusion matrix. The ROC Curve allows the modeler to look at the performance of his model across all possible thresholds. To understand the ROC curve we need to understand the x and y axes used to plot this. On the x axis we have the false positive rate, FPR or fall-out rate. On the y axis we have the true positive rate, TPR or recall.

To test out the Scikit calls that make this curve for us, we use a simple array repeated many times and a prediction array of the same size with different element. The first thing to notice for the roc curve is that we need to define the positive value of a prediction. In our case since our example is binary the class “1” will be the positive class. Second we need the prediction array to contain probability estimates of the positive class or confidence values. This very important because the roc_curve call will set repeatedly a threshold to decide in which class to place our predicted probability. Let’s see the code that does this.

1) Import needed modules

from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt
import random

2) Generate actual and predicted values. First let use a good prediction probabilities array:

actual = [1,1,1,0,0,0]
predictions = [0.9,0.9,0.9,0.1,0.1,0.1]

3) Then we need to calculated the fpr and tpr for all thresholds of the classification. This is where the roc_curve call comes into play. In addition we calculate the auc or area under the curve which is a single summary value in [0,1] that is easier to report and use for other purposes. You usually want to have a high auc value from your classifier.

false_positive_rate, true_positive_rate, thresholds = roc_curve(actual, predictions)
roc_auc = auc(false_positive_rate, true_positive_rate)

4) Finally we plot the fpr vs tpr as well as our auc for our very good classifier.

plt.title('Receiver Operating Characteristic')
plt.plot(false_positive_rate, true_positive_rate, 'b',
label='AUC = %0.2f'% roc_auc)
plt.legend(loc='lower right')
plt.plot([0,1],[0,1],'r--')
plt.xlim([-0.1,1.2])
plt.ylim([-0.1,1.2])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.show()

The figure show how a perfect classifier roc curve looks like:

figure_1_perfect_auc

Here the classifier did not make a single error. The AUC is maximal at 1.00. Let’s see what happens when we introduce some errors in the prediction.

actual = [1,1,1,0,0,0]
predictions = [0.9,0.9,0.1,0.1,0.1,0.1]

figure_1_v2_auc

As we introduce more errors the AUC value goes down. There are a couple of things to remember about the roc curve:

  1. There is a tradeoff betwen the TPR and FPR as we move the threshold of the classifier.
  2. When the test is more accurate the roc curve is closer to the left top borders
  3. A useless classifier is one that has its ROC curve exactly aligned with the diagonal. How does that look like? Let’s say we have a classifier that always gives 0.5 for the classification probabilities.
actual = [1,1,1,0,0,0]
predictions = [0.5,0.5,0.5,0.5,0.5,0.5]

The ROC Curve would like this:

figure_1_useless
Concerning the AUC, a simple rule of thumb to evaluate a classifier based on this summary value is the following:

  • .90-1 = very good (A)
  • .80-.90 = good (B)
  • .70-.80 = not so good (C)
  • .60-.70 = poor (D)
  • .50-.60 = fail (F)

This example dealt with a two class problem (0,1). For a multi-class example checkout this scikit-learn documentation example:

This was an intro to the ideas behind the roc curve and the scikit functions that enable this.