Main navigation

Using Spark

This page is under construction

General information

The supported Spark version (1.6.2) on Hoffman2 is installed at /u/local/apps/spark/1.6.2 which is pre-built for Hadoop 2.6. We recommend following the sections below to run Spark on Hoffman2 cluster, but if you are an expert user, you could load the spark into your environment (i.e. the environment variables: SPARK_HOME, PATH):

$ module load spark

You must not run Spark on the login nodes; either use qrsh to get an interactive session or use qsub to submit a batch job.

The following sections illustrate running Spark examples on Hoffman2 cluster with master URL as local[*] passed to Spark for local testing and unit tests, which means no network IO will be incurred and Spark will run with as many worker threads as logical cores on the machine.

Using Spark in a batch job

On Hoffman2, Spark can be run as a standalone Spark cluster with Yarn. Here we use the example JAR file provided in the Spark installation package to calculate pi.

Here is an example of the job script:

#!/bin/sh
#$ -cwd
#$ -pe shared 8
#$ -l h_rt=8:00:00,h_data=1G,spark

# configure the spark cluster based on a group of nodes
source /u/local/bin/sparkstart.sh

# your spark work is here. 
# we are just running Pi java examples with 10 loops by 5 executors 
$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master local[*] --executor-memory 2G --num-executors 5 \
$SPARK_HOME/lib/spark-examples-1.6.2-hadoop2.6.0.jar 10

# cleanup the tmp files and hadoop services on the nodes
source /u/local/bin/sparkstop.sh
# end of job script

To submit the job script (saved as $SCRATCH/pispark.sh):

$ cd $SCRATCH
$ qsub pispark.sh

The scripts sparkstart.sh and sparkstop.sh automatically set up and clean up a Spark cluster. You do not need to run module load spark in this case. What you need to do is to replace the part between “sparkstart.sh” and “sparkstop.sh” with your own Spark work.

In this example, the result is written to the batch output file.

Using Spark interactively

Running Spark in an interactive session follows the same idea as in batch job submission. However, users will use the qrsh command (instead of qsub). For example, to get an interactive session for 4 hours to use Spark, the qrsh command should be:

$ qrsh -l h_rt=4:00:00,spark

You may have to wait a few minutes. When the prompt returns, you are placed on a compute node. Start the Spark cluster using:

$ source /u/local/bin/sparkstart.sh

Use the jps command to check whether the Hadoop services are successfully started. If everything works, the output should look similar to the following:

$ jps
909 ResourceManager
1148 NodeManager
731 SecondaryNameNode
32614 NameNode
383 DataNode
22135 Master
1691 Jps
22354 Worker

Make sure ResourceManager, NodeManager, SecondaryNameNode, NameNode, DataNode, Master, Worker are present in the output. The numbers (process IDs) shown in the first column may be different.

At this point, you can do Spark map-reduce work interactively (before the time limit of the qrsh session is reached). For example, the command below can run a Pi python example provided from the Spark installation package with 1000 loops:

$ $SPARK_HOME/bin/spark-submit --master local[*] \
       $SPARK_HOME/examples/src/main/python/pi.py 1000

When you are done, before you end your qrsh session, run the following command to cleanup Spark:

$ source /u/local/bin/sparkstop.sh

Spark with Python

Spark 1.6.2 works with the every module of Python on Hoffman2. To run Spark applications in Python, you can use the spark-submit script as shown in the above pi.py example. You can also use pyspark to launch an interactive Python shell with the bash command below:

$ $SPARK_HOME/bin/pyspark --master local[*]

To reproduce the pi.py example result within the interactive Python shell (with the partitions variable set to 100), one can run the following:

>>> from operator import add
>>> from random import random
>>> partitions = 100
>>> n = 100000 * partitions
>>> def f(_):
...     x = random() * 2 - 1
...     y = random() * 2 - 1
...     return 1 if x ** 2 + y ** 2 < 1 else 0 ... >>> count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
>>> print("Pi is roughly %f" % (4.0 * count / n))

Output lines are not shown in the above.

If you look at the pi.py file (either in the $SPARK_HOME/examples/src/main/python directory or here), you can see that it first creates a SparkContext object:

 sc = SparkContext(appName="PythonPi")

In the interactive shell, the SparkContext is automatically available as ‘sc’.

After defining the sample function f to pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle, it creates parallelized collections by calling SparkContext’s parallelize method on the list array:

count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)

Those are the 2 basic steps on the python programming on Spark. For more programming information, please refer to the Apache Spark Programming Guide page.

Using SparkR (R on Spark)

SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. As of April 2015, SparkR has been merged into Apache Spark. In Spark 1.6.2, SparkR provides a distributed data frame implementation that supports operations like selection, filtering, aggregation etc. but on large datasets. SparkR also supports distributed machine learning using MLlib. The official document can be referred in the link.

In the paragraphs below we will illustrate how to run the SparkR on Hoffman2 cluster in an qrsh interactive session. Batch jobs can be formed in a similar way.

Let’s assume we already got a qrsh session and started a Spark cluster using the above steps. First we would need to load R:

module load R

Starting SparkR means to connect your R program to a Spark cluster. You can start it by running $SPARK_HOME/bin/sparkR script to get a configured SparkR shell. Here we recommend to start SparkR from R shell and do the configuration on SQLContext and SparkContext by ourselves:

$ R
## R version 3.2.3 (2015-12-10) -- 
...
> if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
+ Sys.setenv(SPARK_HOME = "/u/local/apps/spark/1.6.2")
+ }
> library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
> sc <- sparkR.init(master = "local[*]", sparkEnvir = list(spark.driver.memory="2g"))

sparkR.init function will generate a few lines of Java INFO. If there are no lines with errors, it means we successfully created a SparkContext and it is ready to run R programs on Spark in a multi-threaded local mode.

To validate the setup, an example of the use of building gaussian GLM module using SparkR is shown below:

# Create sqlContext to work with DataFrames
> sqlContext <- sparkRSQL.init(sc)

# Create the DataFrame
> df <- createDataFrame(sqlContext, iris)

# Fit a gaussian GLM model over the dataset.
> model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model summary are returned in a similar format to R's native glm().
> summary(model)
##...
## Min Max 
## -1.307112 1.412532

## $coefficients
## Estimate Std. Error t value Pr(>|t|) 
## (Intercept) 2.251393 0.3697543 6.08889 9.568102e-09
## Sepal_Width 0.8035609 0.106339 7.556598 4.187317e-12
## Species_versicolor 1.458743 0.1121079 13.01195 0 
## Species_virginica 1.946817 0.100015 19.46525 0 


# Make predictions based on the model.
> predictions <- predict(model, newData = df)
> head(select(predictions, "Sepal_Length", "prediction"))
##  Sepal_Length prediction
## 1 5.1 5.063856
## 2 4.9 4.662076
## 3 4.7 4.822788
## 4 4.6 4.742432
## 5 5.0 5.144212
## 6 5.4 5.385281 

Reference

Report Typos and Errors
UCLA OIT

© 2016 UC REGENTS TERMS OF USE & PRIVACY POLICY