Practical Tips for Distributing Computation with PySpark

Pierce Freeman
6 min readJan 27, 2020

--

Spark is incredibly powerful but requires too heavy of a time sink just to get started. Some simple tips can get you off the ground without a DevOps certification.

PySpark is one of the core tools in a data scientist’s toolbelt. It unlocks limitlessly parallel computation, taking tasks that run for days in a classic linear program and churning through them in under an hour. Once you get everything configured, it’s easy to explore and iterate with logic in native python. But the infrastructure headache of correctly setting up a spark cluster can sometimes dissuade engineers from getting started.

There’s no shortage of articles walking through how to get started with spark, but there’s a lack of practical tips and tricks that might make the difference between a long night of StackOverflow and a day of productivity. Here we start from scratch and tackle some best practices that I’ve found helpful over the last year of dealing with these clusters:

  • Booting up a spark cluster with an attached jupyter notebook on AWS
  • How to use third-party spark packages when in the cloud
  • How to avoid bootstrapping and install pypi packages on the fly
  • Data formatting and size of inputs

Starting an EMR Cluster with Jupyter

Starting an EMR cluster of machines is as simple as callingcreate-cluster and specifying the right dependencies to include on the instances. Amazon’s Console also features a launch wizard that asks for the same configuration parameters if you prefer to use that interface.

aws emr create-cluster \
--name "distributed-spark-cluster" \
--release-label emr-5.29.0 \
--use-default-roles \
--ec2-attributes KeyName={SSH KEY} \
--applications Name=Ganglia Name=Hadoop Name=Spark \
--instance-count 5 \
--instance-type m5.xlarge \
--log-uri s3://spark-clusters/logs/ \
--enable-debugging

This launches a cluster with one master nodes and four worker nodes for a total of the five specified instances. EMR handles the appropriate networking so boxes can communicate to one another, with the Master node acting as the entrypoint to distribute tasks to the workers. You can optionally enable the debugger, which provides helpful logs if you’re looking to diagnose runtime issues with your distributed code.

Now, we’ll start up a Jupyter notebook, which is where we’ll actually do our main spark development work.

Amazon provides a hosted notebook feature called EMR Notebooks, which spins up a jupyterlab instance that connects directly to your cluster. They’re a great way to get started, but face a slight drawback by operating within their own computation environment. Since you’re not an admin in that account, you’re constrained in making some useful configuration changes. Instead, let’s host the notebook straight within the EMR cluster itself.

The Master node issues jobs to the rest of the cluster, so we choose this box to host our jupyter instance. Grab the IP of the master node from EC2 and ssh into this box.

ssh hadoop@{MASTER IP}

We want our jupyter notebook host to stay alive even if we disconnect from ssh, so we’ll install tmux and create a new window.

sudo yum install tmux
tmux new -s jupyter_notebook

Then, create a new python3 virtualenv where we can install some packages that we’ll need for the notebook and spark communication.

python3 -m venv master_env
source master_env/bin/activate
pip install jupyterlab
pip install findspark

Launch jupyter. This step should give you a URL formatted like http://localhost:8888/lab?token={TOKEN}. Save the generated token for use in a second.

jupyter lab

Now, on your local machine, ssh port tunnel into this instance so you’re able to mount the jupyter instance locally.

ssh -L 8890:localhost:8888 hadoop@{MASTER IP}

When you access localhost:8890 in your browser, you should be prompted to login. Use the token that you saved before.

Once you create a new notebook from the web IDE, it’s easy to start using your new spark cluster. We leverage the findspark python package that sniffs for the system-installed version of spark and injects pyspark into the notebook. We can now set up our main spark session.

import findspark
findspark.init()
# pyspark is now available because of findspark's injection
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Ops").getOrCreate()

Customizing Spark Packages

Spark has an ecosystem of 3rd packages that extend its functionality past what’s shipped with the standard library. These connectors provide additional file-reading formats (like avro) and data sources (like S3).

When using standard spark, we can submit a job via spark-submit with the additional --packages keyword parameter. When executing python code in an interactive environment, however, we have to use a slightly different setup. We can inject an environment variable that customizes the pyspark shell used to process spark requests.

import osos.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.apache.spark:spark-avro_2.11:2.4.4,org.apache.hadoop:hadoop-aws:2.7.7 pyspark-shell"spark = SparkSession.builder.appName("Ops").getOrCreate()

When the spark session is initialized after this environment injection, it will download the requested packages.

Be aware of the general naming conventions within these third-party spark packages, since they often correlate to the spark/hadoop versions that they were compiled against. Spark-Avro in the above case is formatted like spark-avro_{scala_version}:{spark_release_version}. If you run into some cryptic Java errors when either downloading the package or trying to access member functions, it might be worth checking that you’re using the right package versions.

Installing Python Dependencies

By default, worker nodes will execute our pyspark code by using a default version of python. We therefore won’t have access to any of the pypi python packages that are commonly used within data science. And since workers operate in completely separate environments, on different hardware, doing a simple pip install within the master instance isn’t going to cut it.

The traditional advice is to install these packages when your cluster launches by leveraging a bootstrap script. This script is a collection of bash commands that are hosted in S3 and run by AWS when it launches your EMR cluster. Something like this:

(s3://my-bucket/bootstrap-scripts/bootstrap.sh)pip install sklearn
pip install numpy

This is all well and good for system-level configurations or if you know exactly what python packages you’re going to need up-front. But when you’re constantly iterating with new approaches in jupyter it becomes a headache to spin down your cluster and restart it just to install one new package.

Instead, we can leverage a new addition to spark that allows us to dynamically install pip packages on the worker nodes. As part of this, we instruct pyspark to use a custom environment when executing in the worker nodes. This will create a new remote virtualenv using the specified generation utility. Create the spark session by passing this new configuration.

from pyspark import SparkConfconf = SparkConf().setAll(
[
("spark.pyspark.virtualenv.enabled", "true"),
("spark.pyspark.virtualenv.bin.path", "/usr/bin/virtualenv"),
("spark.pyspark.python", "python3")
]
)
spark = (
SparkSession
.builder
.config(conf=conf)
.appName("Ops")
.getOrCreate()
)

After that, we can install our new python packages through the spark context. This routes the installation to all workers within our cluster so they can execute follow-up code.

spark.sparkContext.install_pypi_package("numpy==1.13.3")

Input Data Formatting

When reading from disk-based sources, it’s important to think about the particular format of this input data. One of the ways that spark achieves such a significant performance gain is by streaming a large dataset in parallel through many worker nodes. However, spark can’t read a singular file in parallel. For instance, if you have one giant csv file, it will stream through linearly and you won’t benefit from adding more workers to the task.

You therefore benefit from splitting your input data into smaller chunks, where spark can distribute each file to a separate worker. So, should you just split your input data into exactly how many workers there are? Not exactly. There’s a dark-art to this, but my general rule of thumb is to chunk the input data in the following ways:

  • Minimum Quantity: Minimally segment data into the amount of workers you’ve created.
  • Maximum File Size: Limit file-sizes to roughly the amount of data that could be fit in-memory per worker node.

With a combination of these two guidelines, you’ll usually reach full utilization of your worker nodes and churn through the data as fast as your code allows.

Concluding Thoughts

Spark is incredibly performant and adaptable to a wide variety of data processing tasks. Simultaneously, if you’re coming from a data science background, you likely already deal with python when working with your data processing scripts. Marrying the two with pyspark can give the best of both worlds.

I learned the above tips through a lot of trial and fire that have subsequently been pivotal in every spark project I’ve worked on. With them in your back pocket, I hope you turn to pyspark for more data science challenges. Much better to finish processing over a cup of coffee versus an entire business day.

If you’re interested in doing data processing or machine learning on problems that can only be done at Spark scale, get in touch.

--

--

Pierce Freeman
Pierce Freeman

Written by Pierce Freeman

Director of Machine Learning at Globality, Stanford Alumnus. Writes at the intersection of AI, Design, Technology, and Economics.

No responses yet