NOTE: the code in this notebook is shown for better readability. To toggle on/off, click here.

How to Deploy an IPython Cluster Using Mesos and Docker

John Dennison

April 19th, 2016

The members of the Analytics Services team here at Activision are heavy users of Mesos and Marathon to deploy and manage services on our clusters. We are also huge fans of Python and the Jupyter project.

The Jupyter project was recently reorganized from IPython, in a move referred to as "the split": One part that was originally part of IPython (IPython.parallel) was split off into a separate project ipyparallel. This powerful component of the IPython ecosystem is generally overlooked.

In this post I will give a quick introduction to the ipyparallel project and then introduce a new launcher we have open sourced to deploy IPython clusters into Mesos clusters. While we have published this notebook in HTML, please feel free to download the original to follow along.

Introduction to ipyparallel

The ipyparallel project is the new home of IPython.parallel module that was hosted within IPython core before 2015. The focus of the project is interactive cluster computing. This focus on interactive computing and first-class integration with the IPython project is a distinguishing feature. For a more complete dive into the internals of ipyparallel, please visit the docs. I aim to give the bare minimum to get you started.

At the most basic level an IPython cluster is a set of Python interpreters that can be accessed over TCP. Under the hood, it works similarly to how Jupyter/IPython work today. When you open a new notebook in the browser, a Python process (called a kernel) will be started to run the code you submit. ipyparallel does the same thing except instead of a single Python kernel, you can start many distributed kernels over many machines.

There are three main components to the stack.

  • Client: A Python process which submits work. Usually this is an IPython session or a Jupyter notebook.
  • Controller: The central coordinator which accepts work from the client and passes it to engines, collects results and sends back to the client.
  • Engine: A Python interpreter that communicates with the controller to accept work and submit results. Roughly equivalent to an IPython kernel.

Starting your first cluster

The easiest way to get your hands dirty is to spin up a cluster locally. That is you will run a Client, Controller, and Engines all on your local machine. The hardest part of provisioning distributed clusters is making sure all the pieces can talk to each other (as usual the easiest solution to a distributed problem is to make it local).

Getting your environment started

Our team are users of conda to help manage our computational environments (Python and beyond). Here is a quick run through to get setup (our public conda recipes are here). A combination of pip and virtualenv will also work, but when you start installing packages from the scipy stack we find conda the easiest to use.

First find your version of Miniconda from here

If you're using linux these commands will work:

wget https://repo.continuum.io/miniconda/Miniconda3-latest-MacOSX-x86_64.sh
bash Miniconda-latest-Linux-x86_64.sh # follow prompts
conda update --all
# make a new python 3 env named py3
conda create -n py3 python=3 ipython ipyparallel ipython-notebook
source activate py3

While there are lower level commands to start and configure Controllers and Engines, the primary command you will use is ipcluster. This is a helpful utility to start all the components and configure your local client. By default, it uses the LocalControllerLauncher and the LocalEngineSetLauncher which is exactly what we want to start.

Open a terminal install ipyparallel and start a cluster.

(py3)➜ ipcluster start --n=4
2016-04-11 22:24:15.514 [IPClusterStart] Starting ipcluster with [daemon=False]
2016-04-11 22:24:15.515 [IPClusterStart] Creating pid file: /home/vagrant/.ipython/profile_default/pid/ipcluster.pid
2016-04-11 22:24:15.515 [IPClusterStart] Starting Controller with LocalControllerLauncher
2016-04-11 22:24:16.519 [IPClusterStart] Starting 2 Engines with LocalEngineSetLauncher
2016-04-11 22:24:46.633 [IPClusterStart] Engines appear to have started successfully
# You can also use the IPython magic shell command. but errors are harder to see and stopping the cluster can be janky.
!ipcluster start -n 4 --daemon

If started correctly we should now have four engines running on our local machine. Now to actually interact with them. First we need to import the client.

import ipyparallel as ipp
rc = ipp.Client()
rc.ids # list the ids of the engine the client can communicate with
[0, 1, 2, 3]

The client has two primary way to farm out work to the engines. First is a direct view. This is used to apply the same work to all engines. To create a DirectView just slice the client.

The second way is a LoadBalancedView which we will cover later in the post.

dv = rc[:]
dv
<DirectView [0, 1, 2, 3]>

With a direct view you can issue a function to execute within the context of that engine's Python process.

def get_engine_pid():
    import os
    return os.getpid()
    
dv.apply_sync(get_engine_pid)
[31183, 31184, 31186, 31188]

This pattern is so common that ipyparallel provides a IPython magic function to execute a code cell to all engines: %%px

%%px
import os
os.getpid()
Out[0:9]: 31183
Out[1:9]: 31184
Out[2:9]: 31186
Out[3:9]: 31188

It is key to notice that the engines are fully running stateful Python interpreters. If you set a varible within %%px code block, it will remain there.

%%px
foo = 'bar on pid {}'.format(os.getpid())
%%px
foo
Out[0:11]: 'bar on pid 31183'
Out[1:11]: 'bar on pid 31184'
Out[2:11]: 'bar on pid 31186'
Out[3:11]: 'bar on pid 31188'

The DirectView object provides some syntactic sugar to help distributing data to each engine. First is dictionary style retrieval and assignment. First let's retrieve the value of foo from each engine.

dv['foo']
['bar on pid 31183',
 'bar on pid 31184',
 'bar on pid 31186',
 'bar on pid 31188']

Now we can overwrite it's its value.

dv['foo'] = 'bar'
dv['foo']
['bar', 'bar', 'bar', 'bar']

There are many cases where you don't want the same data on each machine, but rather you want to chuck an list and distribute each chunk to an engine. The DirectView provides the .scatter and the .gather methods for this.

# start with a list of ids to work on
user_ids = list(range(1000))
dv.scatter('user_id_chunk', user_ids)
<AsyncResult: scatter>

Notice that this method completed almost immediately and returned an AsyncResult. All the methods we have used up to now have be blocking and synchronous. The scatter method is aysnc. To turn this scatter into a blocking call we can chain a .get() to the call.

dv.scatter('user_id_chunk', user_ids).get()
[None, None, None, None]

Now we have a variable on each engine that holds an equal amount of the original list.

%%px
print("Len", len(user_id_chunk))
print("Max", max(user_id_chunk))
[stdout:0] 
Len 250
Max 249
[stdout:1] 
Len 250
Max 499
[stdout:2] 
Len 250
Max 749
[stdout:3] 
Len 250
Max 999

Let's apply a simple function to each list. First, declare a function within each engine. The --local flag also executes the code block in your local client. This is very useful to help debug your code.

%%px --local
def the_most_interesting_transformation_ever(user_id):
    """
    This function is really interesting
    """
    return "ID:{}".format(user_id * 3)
the_most_interesting_transformation_ever(1)
'ID:3'
%%px
transformed_user_ids = list(map(the_most_interesting_transformation_ever, user_id_chunk))

Now we have 4 separate list of transformed ids. We want to stitch the disparate lists into one list on our local notebook. gather is used for that.

all_transformed_user_ids = dv.gather('transformed_user_ids').get()
print(len(all_transformed_user_ids))
print(all_transformed_user_ids[0:10])
1000
['ID:0', 'ID:3', 'ID:6', 'ID:9', 'ID:12', 'ID:15', 'ID:18', 'ID:21', 'ID:24', 'ID:27']

Obviously, this example is contrived. The serialization cost of shipping Python objects over the wire to each engine is more expensive than the calculation we performed. This tradeoff between serialization/transport vs computation cost is central to any decision to use distributed processing. However, there are many highly parallelizable problems where this project can be extremely useful. Some of the main use cases we use ipyparallel for are hyperparameter searches and bulk loading/writing from storage systems.

LoadBalancedView

The previous example where you scatter a list, perform a calculation, and then gather a result works for lots of problems. One issue with this approach is that each engine does an identical amount of work. If the complexity of the process each engine is performing is variable, this naive scheduling approach can waste processing power and time. Take for example this function:

%%px --local
import random
import time
def fake_external_io(url):
    # Simulate variable complexity/latency
    time.sleep(random.random())
    return "HTML for URL: {}".format(url)
%time fake_external_io(1)
CPU times: user 795 µs, sys: 312 µs, total: 1.11 ms
Wall time: 479 ms
'HTML for URL: 1'
%time fake_external_io(1)
CPU times: user 2.58 ms, sys: 0 ns, total: 2.58 ms
Wall time: 373 ms
'HTML for URL: 1'

If you had a list of urls to scrape and gave each worker an equal share, some workers would finish early and have to sit around doing nothing. A better approach is to assign work to each engine as it finishes. This way the work will be load balanced over the cluster and you will complete your process earlier. ipyparallel provides the LoadBalancedView for this exact use case. For this specific problem, threading or an async event loop would likely be a better approach to speeding up or scaling out, but suspend your disbelief for this exercise.

lview = rc.load_balanced_view()
lview
<LoadBalancedView None>
@lview.parallel()
@ipp.require('time', 'random')
def p_fake_external_io(url):
    # Simulate variable complexity/latency
    time.sleep(random.random())
    return "HTML for URL: {}".format(url)

Here we used two ipyparallel decorators. First we used lview.parallel() to declared this a parallel function. Second, we declared that this function depends on the modules time and random. Now that we have a load balanced function we can compare timings with our naive approach.

urls = ['foo{}.com'.format(i) for i in range(100)]
# Naive single threaded
%time res = list(map(fake_external_io, urls))
CPU times: user 3.14 ms, sys: 17.7 ms, total: 20.8 ms
Wall time: 49 s
dv.scatter('urls', urls).get()
[None, None, None, None]
# seed for some semblance reproducability
%px random.seed(99)
# Naive aassignment
%time %px results = list(map(fake_external_io, urls))
CPU times: user 15.5 ms, sys: 8.35 ms, total: 23.8 ms
Wall time: 13.2 s
# Load balanced version
%time res = p_fake_external_io.map(urls).get()

This isn't a perfect example, but you can get the idea. The larger the number of inputs to your parallel problem, the more variable the run time of each component process, the more time you save from switching to a load balanced view.

This is only scratching the surface of ipyparallel project. I would highly recommend taking a look at the docs. Here is a list of further topics I would look into if you are interested.

  • Support for numpy memmap to allow engine located on a single node to share large arrays
  • Complex dependencies and more specialized scheduling
  • Retry and recovery logic
  • Multiple clients working on the same cluster allowing remote collaborators to share an environment.

Non Trivial Use Cases

Our team at Activision largely uses ipython clusters for distributed model training. This project has been vital for hyperparameter searches for our machine learning models, allowing us to easily parallelize these searches beyond one machine has sped up training by many orders of magnitude utilizing hundreds of cores.

Ok this is cool but I want more cores!!!

The examples so far are a useful introduction to the API and the some features of ipyparallel. Hopefully you are convinced to try out the library. However, deploying a working cluster beyond a single machine introduces some issues.

ipyparallel provides support for a range of cluster and batch job management systems such as PBS and WindowsHPC. The full list is provided in the documentation. ipyparallel also provides an SSH based launcher. Given passwordless ssh onto machine you can easily deploy engines and connect them to your controller and client. Also there is a wonderful project starcluster that helps spin up machines from cloud providers.

These tools are great. If you have access to existing HPC clusters or are planning on deploying dedicated clusters either on your own cold-iron (2016 version of bare-metal) or in the cloud then they meet your needs.

However, we are big users of Mesos, Docker, and Marathon to manage our clusters and services. Furthermore, even with the existing launchers, managing complex dependencies within the engines is a pain. Using Docker to package all dependencies makes deploying heterogeneous clusters easier. Targeting our existing cluster management system and simplifying dependencies is a big win for us.

With this in mind, we are open sourcing a new ipyparallel launcher that deploys IPython clusters into Mesos using Docker and Marathon. The code lives here and on pypi/conda as ipyparallel_mesos.

We have two pre-built Docker images for the Controller and Engine. These are stripped down Docker images. Internally we use conda for almost all our dependencies, even inside our Docker containers. Please visit our public conda recipes and channel. However, extending from the ipyparallel-marathon-engine image will allow you to easily install your custom dependencies with or without conda.

The project is young, but hopefully you will find it useful. Please note that this currently targets Python 3. PR's are welcome to support older versions of Python (it's 2016, we can now refer to 2.7 as old). Please open any issues on the github page. Please read the README for the project for more details.