Distributed Matrix Multiplication in Spark

Background

Over the past year, I’ve been using Apache Spark for data piping, exploring large scale machine learning applications, and working in tuning Spark clusters for peak performance for analysts. Spark is a huge draw to the data science community because of it’s familiar API to pandas, low learning curve, and accessibility by several languages.

Summary

Many Spark tutorials do not dive into the internals of Spark. The goal of this post is to walk someone through the Spark source. We’ll start at Spark’s high level Python API and arrive last at the compiled libraries LAPACK and BLAS.

Setup

I’ll create a post in the future on walking step by step to install an environment, but for now I’ll assume that you have a environment setup, so we can dive into the code.

Code

Below is an example of using Alternating Least Squares as a recommendation engine for movies. It’s not required that you understand the most intricate details of ALS. I’ll be using ALS as a vehicle for exploring the documentaiton and source code of Spark into the underlying linear algebra library calls and nothing more. If you are very interested in this topic, this post is available and translates well. This code can be run interactively line by line with the pyspark shell or in a jupyter notebook.

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = Spark.read.text("sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=long(p[3])))
ratings = Spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="userId",
itemCol="movieId", ratingCol="rating")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

als.fit(training)

If you are using an interactive environment like a jupyter notebook or the pyspark shell with IPython, we can get some initial information about what the als.fit(training) line of code does. If you are familiar with the estimator style API found in scikit-learn this will look very familiar, but we will looking at it’s actual implementation. In order to get a brief descripiton we can simply type ?als.fit(training) in jupyter or the Spark shell (with IPython) and we will be presented with documentation in our environment.

The doc-string states that given a dataframe, we return a fitted model. This might be helpful for someone who isn’t looking for implementation details, so we press on to the source code.

At this point, I move to the pyspark documentation. Since the docs are built with Sphinx, a really useful tool for building documentation that links to source code, it’s easy to reference. The pyspark documentation is hosted here. I frequently use the quick search functionality in Sphinx, because it is much better than other search tools bundled in documentation. After searching for als.fit in quick search we are directed to the documentation of the fit method and we can click to navigate to the source.

Unfortunately, we can’t navigate to the source of the fit.method, because ALS is actually a subclass of JavaEstimator. Here is where things in the Spark API get interesting. In order to reap the performance benifits of the JVM in spark, Java or Scala code is typically wrapped in Python code. The reason that I choose starting with Python is because I wanted to spend some time investigating this wrapping.

Wrapping Java Objects

The Python class ALS that we used earlier performs Java wrapping in it’s constructor with it’s call to it’s super class’ constructor. After calling the constructor for JavaWrapper, ALS set’s up it’s wrapping of the underlying JVM class org.apache.Spark.ml.recommendation.ALS. This class can be found at in the documentation here. It’s a Scala class that a Scala or Java developer would interact with directly if they were writing Spark code with either of these languages. Initial parameters from the constructor of the ALS Python object are passed to the underlying Scala object. As an example we can pass the parameters regParam to the Python ALS class that will in turn pass down to the Scala ALS implementation algorithm.

At this point we are interacting with the underlying Scala implementation through the wrapper that has been developed in pyspark. When we invoke the ALS.fit on our Python object, Spark will call down to _fit_java in order to run on the JVM. _fit_java will then call down to the Scala objects fit method.

Java Virtual Machine Layer

The Java Virtual Machine, JVM, allows programmers that author Java code to run their code, portably, across different system architectures. The Python code that we initially authored now calls pre-compiled Scala code that will run on the JVM. The ALS.fit method does various sanity checks for the dataset for fitting then, converts that dataset into an RDD of case class instances of type Rating. For those not familiar with case classes or RDD’s, seek answers in the Spark documentation.

ALS Implementation

Now the method of ALS.fit finally calls out to the DeveloperApi that implements ALS factorization. We finally made it to the actual implementation of the algorithm after all of that unwrapping! The implementation also performs various sanity checks and then determines whether or not we can use the NNLSSolver for a non-negative matrix, otherwise the CholeskySolver.

Solvers

The two solvers are found in the same file as the ALS Scala class.

Both of these classes extend the trait LeastSquaresNESolver, so either can be used with the function computeFactors(think ducktyping if you only come from a Python background).

val solver = if (non negative) new NNLSSolver else new CholeskySolver
...
computeFactors(..., solver)

computeFactors

The private function computeFactors is where our solvers are actually used. At this line the solvers are invoked to create dstFactors, factors used to make recommendations. The CholeskySolver solves the least squares problem with an L2 regularization using… While the NNLSSolver solves a nonnegative least squares problem with L2 regularization using … subject to x >= 0 CholskeyDecomposition.solve is used for the CholeskySolver and NNLS.solve is used for NNLSSolver.

LAPACK and BLAS

Like we have covered in our lectures @USF, many linear algebra libraries use LAPACK and BLAS under the hood. LAPACK and BLAS are wrapped in the project netlib which Spark uses for CholskeyDecomposition.solve and NNLS.

This is where we end our journey. We’ve made it quite far in our exploration of Spark’s internals and the next stage would be exploring LAPACK and BLAS. If you continue, feel free to inform me on where your journey took you after that.

My Spark & Jupyter Notebook EDA Workflow

I’ve been interning @mozilla for the past 6 months, and at last I’ve discovered my ideal setup for productivity in performing exploratory data analysis on our clusters! I’ll work towards presenting my solution by addressing the various problems that I have come across in performing analysis with spark.

Problems With Other Solutions

“Programming” in Notebooks

Whenever I have limited myself to only working in a Jupyter or Zeppelin notebook, I begin to feel claustrophobic after notebook cells begin to fill up. I’ll be jumping from the bottom of a notebook to the top in order to redefine a map operation or a query. After a while, it’s difficult to program effectively.

Submitting Spark Jobs With Python Files or Scala Jars

On previous projects where I haven’t had to create any visualizations, I’ve worked using spark-submit. These projects were great to work on, because I was no longer restricted by a single notebook file, spark-shell, etc.

However, notebooks are fantastic for sharing knowledge through the generation of reports. In these previous projects our output was JSON for consumption via a web front-end, rather than graphics for other analysts to view. When performing exploratory data analysis, inline plots supported by matplotlib & seaborn can easily be taken advantage of.

I also wanted to take advantage of caching/check-pointing dataframes in spark when exploring such that I don’t re-run my entire application when I have discovered an error in my logic. When using spark-submit, I don’t have this benefit.

Zeppelin Notebooks

I’ve been experimenting working with Zeppelin notebooks, as our spark cluster now has support for them. In Zeppelin, you can add dependencies, which is a great feature when using Scala.

However, Zeppelin is currently not parsed by gist.github.com and is somewhat difficult to get running locally. This makes it a little difficult to pass around notebooks to another analyst to review. Let me know if you have had any success with Zeppelin Notebooks!

Sparks sc.addPyFiles Method

sc.addPyFile has quickly become one of my favourite functions in standard spark. At @mozilla, our cluster launches pyspark with a jupyter server during the bootstrapping process, so requiring python files with the --py-files argument requires launching another spark application. sc.addPyFile solves this problem by distributing dependencies programmatically. I’ve had success in the past by specifying all my source files in a cell at the top of a notebook.

However, this doesn’t allow me to “hot-swap” code in a manor that I have been able to do outside of spark notebooks with a workflow something along the lines of…

import mymodule
reload(mymodule)

As an example, I might have made a mistake in mymodule, fixed the mistake, and wanted to reimport the module. Unfortunately, sc.addPyFile doesn’t redistribute the updated python module from what I have found. This leads me to my somewhat hacky solution.

Solution: Temporarily Using execfile or exec With Later Replacement Using sc.addPyFile

# python 2
execfile('mymodule.py')

# python 3
exec(open('./mymodule.py').read())

I didn’t promise that this solution would be pretty, but I’ve had the best results with it. By having a code cell that runs execfile on each of my source files, I get the benefit of being able to structure my code the way I want and am able to interactively explore data in a notebook. Since it executes my files in the pyspark session, all externally defined functions in those files are available by all of the executors. I can also “hot-swap” updates to those files by updating them, re-executing them, and continuing my analysis. When I use this solution, I am able to separate ETL logic from analysis logic resulting in shorter and easier to follow notebooks.

After finalising a notebook/application, I recommend replacing execfile or exec statements with sc.addPyFile or using the --py-files arguments and performing imports where necessary.

Please Post Other Workflows/Setups

I’m eager to explore other ways of structuring code with spark both for exploratory and production code!

mg-select: Allowing Selections For Metrics Graphics Lines

I’ve built out an addon for the popular visualization library metrics-graphics a visualization library created @mozilla. Below is a snapshot of the current state of mg-select, an addon built to dynamically update charts. I’ll be working towards adding more features in the future, and am looking for collaborators that might be interested in working on it.

mg-select is an addon built for mozilla’s metrics graphics in order to allow users to dynamically select y_accessors for line charts. While working with on crash report projects, we wanted to be able to directly compare two of the potentially numerous y_accessors that we were graphing.

mg-select.gif

Usage

As long as mg-select’s javascript & css have been included, enable the addon with mg_select: true.

MG.data_graphic({
  // ...
  mg_select: true
});

Dependencies

mg-select requires the following libraries to be installed…

  • bootstrap-select the jquery / bootstrap plugin that powers the overlays
  • boostrap & jquery required by bootstrap-select

Issues

The addon currently requires the fields y_accessors & legend to have the same array of variables. This is an issue, because MG will initialize args.y_accessors to “multiline_…”. If there is any way to reference the old y_accessors I’ll update the addon to avoid the requirement addressed.

Setting Up EMR with Spark & Jupyter Notebook

I recently created a repository for how I provision clusters on the Amazon’s Infrastructure. The repository is found at github.com/cameres/emr-spark-jupyter and if you access it, you can follow along with the remainder of the post.

EMR, Spark, & Jupyter

In this tutorial, I’m going to setup a data environment with Amazon EMR, Apache Spark, and Jupyter Notebook. Apache Spark has gotten extremely popular for big data processing and machine learning and EMR makes it incredibly simple to provision a Spark Cluster in minutes! At Mozilla we frequently spin up Spark clusters to perform data analysis and we have a repository for scripts for provisioning our clusters. The scripts contained in my repository extract the functionality that is specific to creating a simple Spark cluster and installing Jupyter Notebook on the main node of the cluster.

Assumptions

The major assumption that I make in the following tutorial is that your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are accessible to awscli. This can be solved by placing the following environmental variables in the environment file of your respective shell. There might be other solutions to this problem, but I personally use this solution.

export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...

Setting Up Key Pair Using EC2

In order to access the cluster via the command line later, you need to generate a Key Pair to ssh into the main node. I haven’t been able to figure out a way in which to create a key pair using the awscli and have it work with the remainder of the script. Thus, I recommend setting up a Key Pair using the EC2 User Guide. Make sure to place the private key in this directory in order to run the script.

Configuring the Script

The following two variables need to be altered based on your use case. They are found in install-jupyter-notebook and script.sh.

# bucket should be created and used on s3
SPARK_BUCKET="bucket-to-create-on-s3"
# name of the key pair ex: MyKeyPair
SPARK_KEY_PAIR="key-pair-created-in-first-step"

Configuring Jupyter Notebook

jupyter_notebook_config.py is used to configure Jupyter Notebook on the main node. As an example, this file can be altered to set a password for access to notebooks. Below is the code for this example.

from notebook.auth import passwd
# get a hashed password
passwd('password')

Running the Script

After following the above steps, you can run the script to provision the cluster using bash script.sh. Each command can also be run separately in your shell if that is preferred.

Accesing Jupyter

In order to forward the notebook server and access Jupyter, we invoke the command below. Make sure that the private key being used has the proper permissions before running the command (if you followed the AWS guide it should)!

ssh -L 8888:localhost:8888 hadoop@ec2-**-***-***-*.us-west-1.compute.amazonaws.com -i <key pair>.pem  

Now we can open localhost:8888 in a web browser and access our Spark context as if it was running locally on our computer.

Install Hadoop & HDFS on OS X 10.11

Below is a simple guide on how setting Hadoop and HDFS on OS X El Capitan. I’ve taken bits and pieces of guides (cited) in an attempt to demonstrate that this setup works with El Capitan. I haven’t yet upgraded to Sierra to check whether the prescribed methods work, so proceed at your own risk. Make sure to comment and message me if I have left any details out!

Install Brew and Hadoop

Homebrew makes installing and managing Hadoop files simple for OS X. If you haven’t installed or used it before, I highly recommend it. I use brew for installing a variety of other tools regularly! In order to proceed to the next step, the following tasks must be completed and are outlined in the gist below.

  • install brew
  • install hadoop with brew
  • note hadoop version number

Modify Hadoop Files

Brew installs these libraries in the /usr/local/Cellar directory of your file system. The files that we need to modify to enable hadoop to run in a pseudo-distributed mode are found in /usr/local/Cellar/hadoop/<version>/libexec/etc/hadoop/. The <version> string should be replaced with the version of hadoop you installed from brew. I installed hadoop 2.7.3, so my path to the files is /usr/local/Cellar/hadoop/2.7.3/libexec/etc/hadoop/. In this directory, core-site.xml and hdfs-site.xml need to be edited while mapred-site.xml needs to be created. The modified files and added files are described in the gists listed below.

See Hadoop: Setting up a Single Node Cluster for more information on the setup of these files.

Starting Hadoop And HDFS

Lastly, the following information needs to be appended to the environment file of your respective shell. I’m using zsh, so I append this code to my .zshrc file.

After this code has been appended to the environment file, we simply need to source the file to load the aliases in our running shell.

# ex: $ source ./zshrc
$ source ./<file>

Continuing w/ Guide

The Single Cluster Guide can be followed for further instructions that are not OS X or Homebrew installation specific. Note after installing hadoop via brew, you will be able to access the hadoop and hdfs command from your command line shell w/o using bin/hdfs.

Testing With Spark

In the near future I’m going to write a post about setting up Jupyter Notebook and Spark locally. This section is entirely optional as I use reading from HDFS as a simple test for my installation. For simplicitly, I downloaded the linkage dataset from UCI (available here).

After continuing the setup of HDFS and Hadoop and unziping each of the blocks, we can push csv files locally to HDFS using the following commands.

I lastly run the pyspark code to read the data from HDFS and print out 10 results.