8.4 Distributed Processing
Sometimes you need more power than your local machine, even with all its cores, can offer. Luckily, GNU Parallel can also leverage the power of remote machines, which really allows us to speed up our pipeline.
What’s great is that GNU Parallel does not have to be installed on the remote machine. All that’s required is that you can connect to the remote machine via SSH, which is also what GNU Parallel uses to distribute our pipeline. (Having GNU Parallel installed is helpful because it can then determine how many cores to employ on each remote machine; more on this later.)
First, we’re going to obtain a list of running AWS EC2 instances. Don’t worry if you don’t have any remote machines, you can replace any occurrence of —slf hostnames
, which tells GNU Parallel which remote machines to use, with —sshlogin :
. This way, you can still follow along with the examples in this section.
Once we know which remote machines to take over, we’re going to consider three flavors of distributed processing:
- Simply running ordinary commands on remote machines.
Distributing local data directly among remote machines.
- Sending files to remote machines, process them, and retrieve the results.
8.4.1 Get List of Running AWS EC2 Instances
In this section we’re creating a file named hostnames that will contain one hostname of a remote machine per line. We’re using Amazon Web Services as an example. If you’re using a different cloud computing service, or have your own servers, please make sure that you create a hostnames file yourself.
We can obtain a list of running AWS EC2 instances from the commanding using aws
, the command-line interface to the AWS API (Services 2014). If you’re not using the Data Science Toolbox, install awscli
using pip
(PyPA 2014) as follows:
$ pip install awscli
With aws
, you can virtually do everything you can do with the online AWS Management Console. We use this command to obtain a list of running EC2 instances from AWS, but it can do a lot more.
We assume that you know how to launch instances, either through the online Management Console or through the aws
command-line tool.
The command aws ec2 describe-instances
returns a lot of information about all your EC2 instances in JSON format (see http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html). We extract the relevant fields using jq
:
$ aws ec2 describe-instances | jq '.Reservations[].Instances[] | '\
> '{public_dns: .PublicDnsName, state: .State.Name}'
{
"state": "running",
"public_dns": "ec2-54-88-122-140.compute-1.amazonaws.com"
}
{
"state": "stopped",
"public_dns": null
}
The possible states of an EC2 instance are: pending, running, shutting-down, terminated, stopping, and stopped. Since we can only distribute our pipeline to running instances, we filter out the non-running instances:
$ aws ec2 describe-instances | jq -r '.Reservations[].Instances[] | '\
> 'select(.State.Name=="running") | .PublicDnsName' > hostnames
$ cat hostnames
ec2-54-88-122-140.compute-1.amazonaws.com
ec2-54-88-89-208.compute-1.amazonaws.com
(If we would leave out -r
, which stands for raw, the hostnames would have been surrounded by double quotes.) We save the output to hostnames, so that we can pass this to parallel
later.
As mentioned, parallel
employs ssh
to connect to the EC2 instances. Add the following to ~/.ssh/config, so that ssh
knows how to connect to the EC2 instances:
Host *.amazonaws.com
IdentityFile ~/.ssh/MyKeyFile.pem
User ubuntu
Depending on your which distribution your running, your username may be different than ubuntu.
8.4.2 Running Commands on Remote Machines
The first flavor of distributed processing is to simply run ordinary commands on remote machines. Let’s first double check that parallel is working by running the command-line tools hostname
List of hosts:
$ parallel --nonall --slf hostnames hostname
ip-172-31-23-204
ip-172-31-23-205
Here, —slf
is short for —sshloginfile
and —nonall
instructs parallel
to execute the same command on every remote machine in the hostnames file without using any parameters. Remember, if you don’t have any remote machines to utilize, you can replace —slf hostnames
with —sshlogin :
so that the command is run on your local machine:
$ parallel --nonall --sshlogin : hostname
data-science-toolbox
Running the same command on every remote machine once only requires one core per machine. If we wanted to distribute the list of arguments passed in to parallel
then it could potentially use more than one core. If the number of cores are not specified explicitly, parallel
will try to determine this:
$ seq 2 | parallel --slf hostnames echo 2>&1 | fold
bash: parallel: command not found
parallel: Warning: Could not figure out number of cpus on ec2-54-88-122-140.comp
ute-1.amazonaws.com (). Using 1.
1
2
In this case, we have parallel
installed on one of the two remote machines. We’re getting a warning message indicating that parallel
is not found on one of them. As a result, parallel
cannot determine the number of cores and will default to using one core. When you receive this warning message, you can do one of the following four things:
- Don’t worry, and be happy with using one core per machine.
- Specify the number of jobs per machine via
-j
. - Specify the number of cores to use per machine by putting, for example, 2/ if you want two cores, in front of each hostname in the hostnames file.
- Install GNU Parallel using a package manager. For example, on Ubuntu:
$ parallel --nonall --slf hostnames "sudo apt-get install -y parallel"
8.4.3 Distributing Local Data among Remote Machines
The second flavor of distributed processing is to distribute local data directly among remote machines. Imagine you have one very large data set that you want to process it using multiple remote machines. For simplicity, we’re going to sum all integers from 1 to 1000. First, let’s double check that our input is actually being distributed by printing the hostname of the remote machine and the length of the input it received using wc
:
$ seq 1000 | parallel -N100 --pipe --slf hosts "(hostname; wc -l) | paste -sd:"
ip-172-31-23-204:100
ip-172-31-23-205:100
ip-172-31-23-205:100
ip-172-31-23-204:100
ip-172-31-23-205:100
ip-172-31-23-204:100
ip-172-31-23-205:100
ip-172-31-23-204:100
ip-172-31-23-205:100
ip-172-31-23-204:100
We can verify that our 1000 numbers get distributed evenly in subsets of 100 (as specified by -N100
). Now, we’re ready to sum all those numbers:
seq 1000 | parallel -N100 --pipe --slf hosts "paste -sd+ | bc" | paste -sd+ | bc
500500
Here, we immediately also sum the ten sums we get back from the remote machines. Let’s double check the answer is correct:
$ seq 1000 | paste -sd+ | bc
500500
Good, that works. If you have a larger command that you want to execute on the remote machines, you can also put it in a separate script and upload it script with parallel
.
Let’s create a very simple command-line tool called sum:
#!/usr/bin/env bash
paste -sd+ | bc
Don’t forget to make it executable as discussed in Chapter 4. The following command first uploads the file sum:
$ seq 1000 | parallel -N100 --basefile sum --pipe --slf hosts './sum' | ./sum
500500
Of course, summing 1000 numbers is only a toy example. It would have been much faster to do this locally. However, we hope it’s clear from this that GNU Parallel can be incredibly powerful.
8.4.4 Processing Files on Remote Machines
The third flavor of distributed processing is to send files to remote machines, process them, and retrieve the results. Imagine that we want to count for each borough of New York City, how often they receive service calls on 311. We don’t have that data on our local machine yet, so let’s first obtain it from https://data.cityofnewyork.us/ using their great API:
$ seq 0 100 900 | parallel "curl -sL 'http://data.cityofnewyork.us/resource'"\
> "'/erm2-nwe9.json?\$limit=100&\$offset={}' | jq -c '.[]' | gzip > {#}.json.gz"
Note that jq -c '.[]'
is used to flatten the array of JSON objects so that there’s one line. We now have 10 files containing compressed JSON data. Let’s see what one line of JSON looks like:
$ zcat 1.json.gz | head -n 1 | fold
{"school_region":"Unspecified","park_facility_name":"Unspecified","x_coordinate_
state_plane":"945974","agency_name":"Department of Health and Mental Hygiene","u
nique_key":"147","facility_type":"N/A","status":"Assigned","school_address":"Uns
pecified","created_date":"2006-08-29T21:25:23","community_board":"01 STATEN ISLA
ND","incident_zip":"10302","school_name":"Unspecified","location":{"latitude":"4
0.62745427115626","longitude":"-74.13789056665027","needs_recoding":false},"comp
laint_type":"Food Establishment","city":"STATEN ISLAND","park_borough":"STATEN I
SLAND","school_state":"Unspecified","longitude":"-74.13789056665027","intersecti
on_street_1":"DECKER AVENUE","y_coordinate_state_plane":"167905","due_date":"200
6-10-05T21:25:23","latitude":"40.62745427115626","school_code":"Unspecified","sc
hool_city":"Unspecified","address_type":"INTERSECTION","intersection_street_2":"
BARRETT AVENUE","school_number":"Unspecified","resolution_action_updated_date":"
2006-10-06T00:00:17","descriptor":"Handwashing","school_zip":"Unspecified","loca
tion_type":"Restaurant/Bar/Deli/Bakery","agency":"DOHMH","borough":"STATEN ISLAN
D","school_phone_number":"Unspecified"}
If we were to get the total number of service calls per borough on our local machine, we would run the following command:
$ zcat *.json.gz |
> ./jq -r '.borough' |
> tr '[A-Z] ' '[a-z]_' |
> sort | uniq -c |
> awk '{print $2","$1}' |
> header -a borough,count |
> csvsort -rc count | csvlook
|----------------+--------|
| borough | count |
|----------------+--------|
| unspecified | 467 |
| manhattan | 274 |
| brooklyn | 103 |
| queens | 77 |
| bronx | 44 |
| staten_island | 35 |
|----------------+--------|
Because this is quite a long pipeline, and because we’re using it again in a moment with parallel
, it’s worth to go over it:
- Expand all compressed files using
zcat
. - For each call, extract the name of the borough using
jq
. - Convert borough names to lowercase and replace spaces with underscores (because
awk
splits on whitespace by default). Count the occurrences of each borough using
sort
anduniq
.Reverse the count and borough and make it comma delimited using
awk
.- Add a header using
header
. - Sort by count and print table using
csvsort
(Groskopf 2014j).
Imagine, for a moment, that our own machine is so slow that we simply cannot perform this pipeline locally. We can use GNU Parallel to distribute the local files among the remote machines, let them do the processing, and retrieve the results:
$ ls *.json.gz |
> parallel -v --basefile jq \
> --trc {.}.csv \
> --slf hostnames \
> "zcat {} | ./jq -r '.borough' | tr '[A-Z] ' '[a-z]_' | sort | uniq -c |"\
> " awk '{print \$2\",\"\$1}' > {.}.csv"
zcat 10.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
zcat 2.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
zcat 1.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
zcat 3.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
zcat 4.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
zcat 5.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
zcat 6.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
zcat 7.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
zcat 8.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
zcat 9.json.gz | ./jq -r '.borough' | sort | uniq -c | awk '{print $2","$1}'
This long command breaks down as follows:
- Print the list of files and pipe it into
parallel
. - Transmit the
jq
binary to each remote machine. Lucklily, jq has no dependencies. This file will be removed from the remote machine at the end because we specified—trc
(which implies the—cleanup
command-line argument). - The command-line argument
—trc {.}.csv
is short for—transfer —return {.}.csv —cleanup
. (The replacement string {.} gets replaced with the input filename without the last extension.) Here, this means that the JSON file gets transfered to the remote machine, the CSV file gets returned to the local machine, and both files will be removed after each job from the remote machine. Specify a list of hostnames. Remember, if you want to try this out locally, you can specify
—sshlogin :
instead of—self hostnames
.Note the escaping in the
awk
expression. Quoting can sometimes be tricky. Here, the dollar signs and the double quotes are escaped. In quoting ever gets too confusing, remember that you put the pipeline into a separate command-line tool just as we did withsum
.
If we, at some point during this command, run ls
on one of the remote machines, we could see that parallel
indeed transfers (and cleans up) the binary jq
, the JSON files, and CSV files:
$ ssh $(head -n 1 hostnames) ls
1.json.csv
1.json.gz
jq
Each CSV file looks like this:
$ cat 1.json.csv
bronx,3
brooklyn,5
manhattan,24
queens,3
staten_island,2
unspecified,63
We can sum the counts in each CSV file using Rio and the aggregate
function in R:
$ cat *.csv | header -a borough,count |
> Rio -e 'aggregate(count ~ borough, df, sum)' |
> csvsort -rc count | csvlook
|----------------+--------|
| borough | count |
|----------------+--------|
| unspecified | 467 |
| manhattan | 274 |
| brooklyn | 103 |
| queens | 77 |
| bronx | 44 |
| staten_island | 35 |
|----------------+--------|
Or, if you prefer to use SQL to aggregate results, you can use csvsql
as discussed in Chapter 5:
$ cat *.csv | header -a borough,count |
> csvsql --query 'SELECT borough, SUM(count) AS count FROM stdin '\
> 'GROUP BY borough ORDER BY count DESC' | csvlook
|----------------+--------|
| borough | count |
|----------------+--------|
| unspecified | 467 |
| manhattan | 274 |
| brooklyn | 103 |
| queens | 77 |
| bronx | 44 |
| staten_island | 35 |
|----------------+--------|