You can run this notebook in a live session or view it on Github.
Bag: Parallel Lists for semi-structured data
Dask-bag excels in processing data that can be represented as a sequence of arbitrary inputs. We’ll refer to this as “messy” data, because it can contain complex nested structures, missing fields, mixtures of data types, etc. The functional programming style fits very nicely with standard Python iteration, such as can be found in the itertools
module.
Messy data is often encountered at the beginning of data processing pipelines when large volumes of raw data are first consumed. The initial set of data might be JSON, CSV, XML, or any other format that does not enforce strict structure and datatypes. For this reason, the initial data massaging and processing is often done with Python list
s, dict
s, and set
s.
These core data structures are optimized for general-purpose storage and processing. Adding streaming computation with iterators/generator expressions or libraries like itertools
or toolz</code> <[https://toolz.readthedocs.io/en/latest/](https://toolz.readthedocs.io/en/latest/)>
__ let us process large volumes in a small space. If we combine this with parallel processing then we can churn through a fair amount of data.
Dask.bag is a high level Dask collection to automate common workloads of this form. In a nutshell
- dask.bag = map, filter, toolz + parallel execution
Related Documentation
Create data
- [1]:
- %run prep.py -d accounts
Setup
Again, we’ll use the distributed scheduler. Schedulers will be explained in depth later.
- [2]:
- from dask.distributed import Client
-
- client = Client(n_workers=4)
Creation
You can create a Bag
from a Python sequence, from files, from data on S3, etc. We demonstrate using .take()
to show elements of the data. (Doing .take(1)
results in a tuple with one element)
Note that the data are partitioned into blocks, and there are many items per block. In the first example, the two partitions contain five elements each, and in the following two, each file is partitioned into one or more bytes blocks.
- [3]:
- # each element is an integer
- import dask.bag as db
- b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], npartitions=2)
- b.take(3)
- [3]:
- (1, 2, 3)
- [4]:
- # each element is a text file, where each line is a JSON object
- # note that the compression is handled automatically
- import os
- b = db.read_text(os.path.join('data', 'accounts.*.json.gz'))
- b.take(1)
- [4]:
- ('{"id": 0, "name": "Oliver", "transactions": [{"transaction-id": 233, "amount": 137}, {"transaction-id": 459, "amount": 73}, {"transaction-id": 2030, "amount": 112}, {"transaction-id": 2769, "amount": 89}, {"transaction-id": 3027, "amount": 59}, {"transaction-id": 4647, "amount": 40}, {"transaction-id": 4672, "amount": 76}, {"transaction-id": 4850, "amount": 112}, {"transaction-id": 5376, "amount": 109}, {"transaction-id": 5473, "amount": 70}, {"transaction-id": 5677, "amount": 91}, {"transaction-id": 5783, "amount": 82}, {"transaction-id": 5986, "amount": 65}, {"transaction-id": 6583, "amount": 121}, {"transaction-id": 6657, "amount": 61}, {"transaction-id": 6797, "amount": 110}, {"transaction-id": 7660, "amount": 112}, {"transaction-id": 8530, "amount": 128}, {"transaction-id": 8547, "amount": 131}, {"transaction-id": 8657, "amount": 85}, {"transaction-id": 8723, "amount": 38}, {"transaction-id": 8779, "amount": 46}, {"transaction-id": 8955, "amount": 122}, {"transaction-id": 9086, "amount": 78}, {"transaction-id": 9194, "amount": 80}, {"transaction-id": 9859, "amount": 114}, {"transaction-id": 9977, "amount": 95}]}\n',)
- [5]:
- # Requires `s3fs` library
- # each partition is a remote CSV text file
- b = db.read_text('s3://dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv',
- storage_options={'anon': True})
- b.take(1)
- [5]:
- ('VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount\n',)
Manipulation
Bag
objects hold the standard functional API found in projects like the Python standard library, toolz
, or pyspark
, including map
, filter
, groupby
, etc..
Operations on Bag
objects create new bags. Call the .compute()
method to trigger execution, as we saw for Delayed
objects.
- [6]:
- def is_even(n):
- return n % 2 == 0
-
- b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
- c = b.filter(is_even).map(lambda x: x ** 2)
- c
- [6]:
- dask.bag<lambda, npartitions=10>
- [7]:
- # blocking form: wait for completion (which is very fast in this case)
- c.compute()
- [7]:
- [4, 16, 36, 64, 100]
Example: Accounts JSON data
We’ve created a fake dataset of gzipped JSON data in your data directory. This is like the example used in the DataFrame
example we will see later, except that it has bundled up all of the entires for each individual id
into a single record. This is similar to data that you might collect off of a document store database or a web API.
Each line is a JSON encoded dictionary with the following keys
id: Unique identifier of the customer
name: Name of the customer
transactions: List of transaction-id
, amount
pairs, one for each transaction for the customer in that file
- [8]:
- filename = os.path.join('data', 'accounts.*.json.gz')
- lines = db.read_text(filename)
- lines.take(3)
- [8]:
- ('{"id": 0, "name": "Oliver", "transactions": [{"transaction-id": 233, "amount": 137}, {"transaction-id": 459, "amount": 73}, {"transaction-id": 2030, "amount": 112}, {"transaction-id": 2769, "amount": 89}, {"transaction-id": 3027, "amount": 59}, {"transaction-id": 4647, "amount": 40}, {"transaction-id": 4672, "amount": 76}, {"transaction-id": 4850, "amount": 112}, {"transaction-id": 5376, "amount": 109}, {"transaction-id": 5473, "amount": 70}, {"transaction-id": 5677, "amount": 91}, {"transaction-id": 5783, "amount": 82}, {"transaction-id": 5986, "amount": 65}, {"transaction-id": 6583, "amount": 121}, {"transaction-id": 6657, "amount": 61}, {"transaction-id": 6797, "amount": 110}, {"transaction-id": 7660, "amount": 112}, {"transaction-id": 8530, "amount": 128}, {"transaction-id": 8547, "amount": 131}, {"transaction-id": 8657, "amount": 85}, {"transaction-id": 8723, "amount": 38}, {"transaction-id": 8779, "amount": 46}, {"transaction-id": 8955, "amount": 122}, {"transaction-id": 9086, "amount": 78}, {"transaction-id": 9194, "amount": 80}, {"transaction-id": 9859, "amount": 114}, {"transaction-id": 9977, "amount": 95}]}\n',
- '{"id": 1, "name": "Oliver", "transactions": [{"transaction-id": 168, "amount": 156}, {"transaction-id": 448, "amount": 162}, {"transaction-id": 637, "amount": 153}, {"transaction-id": 1068, "amount": 167}, {"transaction-id": 1257, "amount": 146}, {"transaction-id": 1464, "amount": 160}, {"transaction-id": 1534, "amount": 161}, {"transaction-id": 1651, "amount": 166}, {"transaction-id": 2333, "amount": 172}, {"transaction-id": 3121, "amount": 166}, {"transaction-id": 3360, "amount": 170}, {"transaction-id": 3541, "amount": 155}, {"transaction-id": 3770, "amount": 175}, {"transaction-id": 3818, "amount": 157}, {"transaction-id": 4803, "amount": 174}, {"transaction-id": 5174, "amount": 147}, {"transaction-id": 5580, "amount": 158}, {"transaction-id": 6474, "amount": 164}, {"transaction-id": 6577, "amount": 156}, {"transaction-id": 6952, "amount": 159}, {"transaction-id": 7157, "amount": 151}, {"transaction-id": 7254, "amount": 163}, {"transaction-id": 7340, "amount": 153}, {"transaction-id": 7570, "amount": 166}, {"transaction-id": 7723, "amount": 155}, {"transaction-id": 7875, "amount": 159}, {"transaction-id": 7923, "amount": 164}, {"transaction-id": 8448, "amount": 163}, {"transaction-id": 8789, "amount": 163}, {"transaction-id": 8997, "amount": 159}, {"transaction-id": 9004, "amount": 159}, {"transaction-id": 9605, "amount": 176}, {"transaction-id": 9697, "amount": 168}, {"transaction-id": 9842, "amount": 161}, {"transaction-id": 9994, "amount": 155}]}\n',
- '{"id": 3, "name": "Patricia", "transactions": [{"transaction-id": 640, "amount": 401}, {"transaction-id": 776, "amount": 387}, {"transaction-id": 1092, "amount": 370}, {"transaction-id": 1160, "amount": 376}, {"transaction-id": 1335, "amount": 404}, {"transaction-id": 1533, "amount": 403}, {"transaction-id": 1573, "amount": 398}, {"transaction-id": 1749, "amount": 381}, {"transaction-id": 2134, "amount": 417}, {"transaction-id": 2439, "amount": 410}, {"transaction-id": 3653, "amount": 402}, {"transaction-id": 4392, "amount": 407}, {"transaction-id": 4785, "amount": 411}, {"transaction-id": 5720, "amount": 376}, {"transaction-id": 5870, "amount": 392}, {"transaction-id": 6007, "amount": 392}, {"transaction-id": 6180, "amount": 392}, {"transaction-id": 6253, "amount": 420}, {"transaction-id": 6521, "amount": 382}, {"transaction-id": 6769, "amount": 382}, {"transaction-id": 7037, "amount": 363}, {"transaction-id": 7076, "amount": 382}, {"transaction-id": 7370, "amount": 405}, {"transaction-id": 7437, "amount": 378}, {"transaction-id": 7514, "amount": 411}, {"transaction-id": 7554, "amount": 385}, {"transaction-id": 7952, "amount": 391}, {"transaction-id": 8416, "amount": 380}, {"transaction-id": 8425, "amount": 374}, {"transaction-id": 8488, "amount": 396}, {"transaction-id": 8828, "amount": 378}, {"transaction-id": 8947, "amount": 374}, {"transaction-id": 9119, "amount": 378}, {"transaction-id": 9856, "amount": 375}, {"transaction-id": 9982, "amount": 405}]}\n')
Our data comes out of the file as lines of text. Notice that file decompression happened automatically. We can make this data look more reasonable by mapping the json.loads
function onto our bag.
- [9]:
- import json
- js = lines.map(json.loads)
- # take: inspect first few elements
- js.take(3)
- [9]:
- ({'id': 0,
- 'name': 'Oliver',
- 'transactions': [{'transaction-id': 233, 'amount': 137},
- {'transaction-id': 459, 'amount': 73},
- {'transaction-id': 2030, 'amount': 112},
- {'transaction-id': 2769, 'amount': 89},
- {'transaction-id': 3027, 'amount': 59},
- {'transaction-id': 4647, 'amount': 40},
- {'transaction-id': 4672, 'amount': 76},
- {'transaction-id': 4850, 'amount': 112},
- {'transaction-id': 5376, 'amount': 109},
- {'transaction-id': 5473, 'amount': 70},
- {'transaction-id': 5677, 'amount': 91},
- {'transaction-id': 5783, 'amount': 82},
- {'transaction-id': 5986, 'amount': 65},
- {'transaction-id': 6583, 'amount': 121},
- {'transaction-id': 6657, 'amount': 61},
- {'transaction-id': 6797, 'amount': 110},
- {'transaction-id': 7660, 'amount': 112},
- {'transaction-id': 8530, 'amount': 128},
- {'transaction-id': 8547, 'amount': 131},
- {'transaction-id': 8657, 'amount': 85},
- {'transaction-id': 8723, 'amount': 38},
- {'transaction-id': 8779, 'amount': 46},
- {'transaction-id': 8955, 'amount': 122},
- {'transaction-id': 9086, 'amount': 78},
- {'transaction-id': 9194, 'amount': 80},
- {'transaction-id': 9859, 'amount': 114},
- {'transaction-id': 9977, 'amount': 95}]},
- {'id': 1,
- 'name': 'Oliver',
- 'transactions': [{'transaction-id': 168, 'amount': 156},
- {'transaction-id': 448, 'amount': 162},
- {'transaction-id': 637, 'amount': 153},
- {'transaction-id': 1068, 'amount': 167},
- {'transaction-id': 1257, 'amount': 146},
- {'transaction-id': 1464, 'amount': 160},
- {'transaction-id': 1534, 'amount': 161},
- {'transaction-id': 1651, 'amount': 166},
- {'transaction-id': 2333, 'amount': 172},
- {'transaction-id': 3121, 'amount': 166},
- {'transaction-id': 3360, 'amount': 170},
- {'transaction-id': 3541, 'amount': 155},
- {'transaction-id': 3770, 'amount': 175},
- {'transaction-id': 3818, 'amount': 157},
- {'transaction-id': 4803, 'amount': 174},
- {'transaction-id': 5174, 'amount': 147},
- {'transaction-id': 5580, 'amount': 158},
- {'transaction-id': 6474, 'amount': 164},
- {'transaction-id': 6577, 'amount': 156},
- {'transaction-id': 6952, 'amount': 159},
- {'transaction-id': 7157, 'amount': 151},
- {'transaction-id': 7254, 'amount': 163},
- {'transaction-id': 7340, 'amount': 153},
- {'transaction-id': 7570, 'amount': 166},
- {'transaction-id': 7723, 'amount': 155},
- {'transaction-id': 7875, 'amount': 159},
- {'transaction-id': 7923, 'amount': 164},
- {'transaction-id': 8448, 'amount': 163},
- {'transaction-id': 8789, 'amount': 163},
- {'transaction-id': 8997, 'amount': 159},
- {'transaction-id': 9004, 'amount': 159},
- {'transaction-id': 9605, 'amount': 176},
- {'transaction-id': 9697, 'amount': 168},
- {'transaction-id': 9842, 'amount': 161},
- {'transaction-id': 9994, 'amount': 155}]},
- {'id': 3,
- 'name': 'Patricia',
- 'transactions': [{'transaction-id': 640, 'amount': 401},
- {'transaction-id': 776, 'amount': 387},
- {'transaction-id': 1092, 'amount': 370},
- {'transaction-id': 1160, 'amount': 376},
- {'transaction-id': 1335, 'amount': 404},
- {'transaction-id': 1533, 'amount': 403},
- {'transaction-id': 1573, 'amount': 398},
- {'transaction-id': 1749, 'amount': 381},
- {'transaction-id': 2134, 'amount': 417},
- {'transaction-id': 2439, 'amount': 410},
- {'transaction-id': 3653, 'amount': 402},
- {'transaction-id': 4392, 'amount': 407},
- {'transaction-id': 4785, 'amount': 411},
- {'transaction-id': 5720, 'amount': 376},
- {'transaction-id': 5870, 'amount': 392},
- {'transaction-id': 6007, 'amount': 392},
- {'transaction-id': 6180, 'amount': 392},
- {'transaction-id': 6253, 'amount': 420},
- {'transaction-id': 6521, 'amount': 382},
- {'transaction-id': 6769, 'amount': 382},
- {'transaction-id': 7037, 'amount': 363},
- {'transaction-id': 7076, 'amount': 382},
- {'transaction-id': 7370, 'amount': 405},
- {'transaction-id': 7437, 'amount': 378},
- {'transaction-id': 7514, 'amount': 411},
- {'transaction-id': 7554, 'amount': 385},
- {'transaction-id': 7952, 'amount': 391},
- {'transaction-id': 8416, 'amount': 380},
- {'transaction-id': 8425, 'amount': 374},
- {'transaction-id': 8488, 'amount': 396},
- {'transaction-id': 8828, 'amount': 378},
- {'transaction-id': 8947, 'amount': 374},
- {'transaction-id': 9119, 'amount': 378},
- {'transaction-id': 9856, 'amount': 375},
- {'transaction-id': 9982, 'amount': 405}]})
Basic Queries
Once we parse our JSON data into proper Python objects (dict
s, list
s, etc.) we can perform more interesting queries by creating small Python functions to run on our data.
- [10]:
- # filter: keep only some elements of the sequence
- js.filter(lambda record: record['name'] == 'Alice').take(5)
- [10]:
- ({'id': 5,
- 'name': 'Alice',
- 'transactions': [{'transaction-id': 1535, 'amount': 335},
- {'transaction-id': 1792, 'amount': 350},
- {'transaction-id': 2554, 'amount': 367},
- {'transaction-id': 2560, 'amount': 347},
- {'transaction-id': 3063, 'amount': 340},
- {'transaction-id': 3445, 'amount': 362},
- {'transaction-id': 3467, 'amount': 352},
- {'transaction-id': 4260, 'amount': 324},
- {'transaction-id': 4334, 'amount': 328},
- {'transaction-id': 4654, 'amount': 318},
- {'transaction-id': 5900, 'amount': 347},
- {'transaction-id': 6856, 'amount': 333},
- {'transaction-id': 7445, 'amount': 308},
- {'transaction-id': 7744, 'amount': 356},
- {'transaction-id': 8117, 'amount': 353},
- {'transaction-id': 8587, 'amount': 343}]},
- {'id': 23,
- 'name': 'Alice',
- 'transactions': [{'transaction-id': 11, 'amount': 2316},
- {'transaction-id': 228, 'amount': 2522},
- {'transaction-id': 319, 'amount': 2204},
- {'transaction-id': 577, 'amount': 2558},
- {'transaction-id': 737, 'amount': 2772},
- {'transaction-id': 797, 'amount': 2526},
- {'transaction-id': 989, 'amount': 2294},
- {'transaction-id': 1214, 'amount': 2653},
- {'transaction-id': 1365, 'amount': 2266},
- {'transaction-id': 1435, 'amount': 2245},
- {'transaction-id': 1452, 'amount': 2535},
- {'transaction-id': 1553, 'amount': 2496},
- {'transaction-id': 1776, 'amount': 2684},
- {'transaction-id': 2027, 'amount': 2650},
- {'transaction-id': 2167, 'amount': 2590},
- {'transaction-id': 2404, 'amount': 2562},
- {'transaction-id': 2414, 'amount': 2497},
- {'transaction-id': 2591, 'amount': 2704},
- {'transaction-id': 2686, 'amount': 2688},
- {'transaction-id': 2781, 'amount': 2421},
- {'transaction-id': 2813, 'amount': 2562},
- {'transaction-id': 2865, 'amount': 2705},
- {'transaction-id': 2879, 'amount': 2540},
- {'transaction-id': 3139, 'amount': 2586},
- {'transaction-id': 3188, 'amount': 2663},
- {'transaction-id': 3366, 'amount': 2258},
- {'transaction-id': 3476, 'amount': 2371},
- {'transaction-id': 3618, 'amount': 2438},
- {'transaction-id': 3676, 'amount': 2610},
- {'transaction-id': 3741, 'amount': 2272},
- {'transaction-id': 3936, 'amount': 2432},
- {'transaction-id': 4231, 'amount': 2292},
- {'transaction-id': 4737, 'amount': 2699},
- {'transaction-id': 4922, 'amount': 2470},
- {'transaction-id': 4959, 'amount': 2338},
- {'transaction-id': 5190, 'amount': 2400},
- {'transaction-id': 5465, 'amount': 2511},
- {'transaction-id': 5471, 'amount': 2439},
- {'transaction-id': 5971, 'amount': 2520},
- {'transaction-id': 6072, 'amount': 2441},
- {'transaction-id': 6152, 'amount': 2604},
- {'transaction-id': 6154, 'amount': 2347},
- {'transaction-id': 6200, 'amount': 2224},
- {'transaction-id': 6314, 'amount': 2553},
- {'transaction-id': 6579, 'amount': 2474},
- {'transaction-id': 7142, 'amount': 2269},
- {'transaction-id': 7231, 'amount': 2446},
- {'transaction-id': 7281, 'amount': 2509},
- {'transaction-id': 7379, 'amount': 2657},
- {'transaction-id': 7776, 'amount': 2424},
- {'transaction-id': 7922, 'amount': 2577},
- {'transaction-id': 8319, 'amount': 2524},
- {'transaction-id': 8666, 'amount': 2489},
- {'transaction-id': 8873, 'amount': 2374},
- {'transaction-id': 9167, 'amount': 2301},
- {'transaction-id': 9197, 'amount': 2244},
- {'transaction-id': 9429, 'amount': 2249},
- {'transaction-id': 9695, 'amount': 2410}]},
- {'id': 33,
- 'name': 'Alice',
- 'transactions': [{'transaction-id': 1, 'amount': -311},
- {'transaction-id': 262, 'amount': -300},
- {'transaction-id': 766, 'amount': -400},
- {'transaction-id': 1607, 'amount': -355},
- {'transaction-id': 4148, 'amount': -254},
- {'transaction-id': 4507, 'amount': -302},
- {'transaction-id': 5692, 'amount': -250},
- {'transaction-id': 5861, 'amount': -294},
- {'transaction-id': 6351, 'amount': -355},
- {'transaction-id': 8661, 'amount': -374},
- {'transaction-id': 9132, 'amount': -384},
- {'transaction-id': 9321, 'amount': -337}]},
- {'id': 34,
- 'name': 'Alice',
- 'transactions': [{'transaction-id': 224, 'amount': 1011},
- {'transaction-id': 1059, 'amount': 929},
- {'transaction-id': 1995, 'amount': 963},
- {'transaction-id': 3191, 'amount': 1002},
- {'transaction-id': 3864, 'amount': 948},
- {'transaction-id': 4924, 'amount': 961},
- {'transaction-id': 5529, 'amount': 887},
- {'transaction-id': 6230, 'amount': 935},
- {'transaction-id': 6914, 'amount': 968},
- {'transaction-id': 7716, 'amount': 963},
- {'transaction-id': 7810, 'amount': 942},
- {'transaction-id': 7995, 'amount': 980},
- {'transaction-id': 8068, 'amount': 912},
- {'transaction-id': 8508, 'amount': 894},
- {'transaction-id': 8717, 'amount': 942},
- {'transaction-id': 9718, 'amount': 875},
- {'transaction-id': 9787, 'amount': 967}]},
- {'id': 38,
- 'name': 'Alice',
- 'transactions': [{'transaction-id': 404, 'amount': 3592},
- {'transaction-id': 747, 'amount': 3113},
- {'transaction-id': 993, 'amount': 3502},
- {'transaction-id': 1125, 'amount': 3689},
- {'transaction-id': 1226, 'amount': 4087},
- {'transaction-id': 2259, 'amount': 3561},
- {'transaction-id': 2406, 'amount': 3650},
- {'transaction-id': 2616, 'amount': 3985},
- {'transaction-id': 2998, 'amount': 2988},
- {'transaction-id': 3277, 'amount': 3626},
- {'transaction-id': 3516, 'amount': 4007},
- {'transaction-id': 3775, 'amount': 4236},
- {'transaction-id': 4167, 'amount': 4045},
- {'transaction-id': 4947, 'amount': 3702},
- {'transaction-id': 5037, 'amount': 3229},
- {'transaction-id': 5287, 'amount': 4199},
- {'transaction-id': 5323, 'amount': 3867},
- {'transaction-id': 5563, 'amount': 3809},
- {'transaction-id': 6087, 'amount': 3604},
- {'transaction-id': 6292, 'amount': 3463},
- {'transaction-id': 6489, 'amount': 3914},
- {'transaction-id': 6807, 'amount': 3272},
- {'transaction-id': 7731, 'amount': 3517},
- {'transaction-id': 7996, 'amount': 3396},
- {'transaction-id': 8695, 'amount': 3611},
- {'transaction-id': 8927, 'amount': 3710}]})
- [11]:
- def count_transactions(d):
- return {'name': d['name'], 'count': len(d['transactions'])}
-
- # map: apply a function to each element
- (js.filter(lambda record: record['name'] == 'Alice')
- .map(count_transactions)
- .take(5))
- [11]:
- ({'name': 'Alice', 'count': 16},
- {'name': 'Alice', 'count': 58},
- {'name': 'Alice', 'count': 12},
- {'name': 'Alice', 'count': 17},
- {'name': 'Alice', 'count': 26})
- [12]:
- # pluck: select a field, as from a dictionary, element[field]
- (js.filter(lambda record: record['name'] == 'Alice')
- .map(count_transactions)
- .pluck('count')
- .take(5))
- [12]:
- (16, 58, 12, 17, 26)
- [13]:
- # Average number of transactions for all of the Alice entries
- (js.filter(lambda record: record['name'] == 'Alice')
- .map(count_transactions)
- .pluck('count')
- .mean()
- .compute())
- [13]:
- 56.9440353460972
Use flatten to de-nest
In the example below we see the use of .flatten()
to flatten results. We compute the average amount for all transactions for all Alices.
- [14]:
- js.filter(lambda record: record['name'] == 'Alice').pluck('transactions').take(3)
- [14]:
- ([{'transaction-id': 1535, 'amount': 335},
- {'transaction-id': 1792, 'amount': 350},
- {'transaction-id': 2554, 'amount': 367},
- {'transaction-id': 2560, 'amount': 347},
- {'transaction-id': 3063, 'amount': 340},
- {'transaction-id': 3445, 'amount': 362},
- {'transaction-id': 3467, 'amount': 352},
- {'transaction-id': 4260, 'amount': 324},
- {'transaction-id': 4334, 'amount': 328},
- {'transaction-id': 4654, 'amount': 318},
- {'transaction-id': 5900, 'amount': 347},
- {'transaction-id': 6856, 'amount': 333},
- {'transaction-id': 7445, 'amount': 308},
- {'transaction-id': 7744, 'amount': 356},
- {'transaction-id': 8117, 'amount': 353},
- {'transaction-id': 8587, 'amount': 343}],
- [{'transaction-id': 11, 'amount': 2316},
- {'transaction-id': 228, 'amount': 2522},
- {'transaction-id': 319, 'amount': 2204},
- {'transaction-id': 577, 'amount': 2558},
- {'transaction-id': 737, 'amount': 2772},
- {'transaction-id': 797, 'amount': 2526},
- {'transaction-id': 989, 'amount': 2294},
- {'transaction-id': 1214, 'amount': 2653},
- {'transaction-id': 1365, 'amount': 2266},
- {'transaction-id': 1435, 'amount': 2245},
- {'transaction-id': 1452, 'amount': 2535},
- {'transaction-id': 1553, 'amount': 2496},
- {'transaction-id': 1776, 'amount': 2684},
- {'transaction-id': 2027, 'amount': 2650},
- {'transaction-id': 2167, 'amount': 2590},
- {'transaction-id': 2404, 'amount': 2562},
- {'transaction-id': 2414, 'amount': 2497},
- {'transaction-id': 2591, 'amount': 2704},
- {'transaction-id': 2686, 'amount': 2688},
- {'transaction-id': 2781, 'amount': 2421},
- {'transaction-id': 2813, 'amount': 2562},
- {'transaction-id': 2865, 'amount': 2705},
- {'transaction-id': 2879, 'amount': 2540},
- {'transaction-id': 3139, 'amount': 2586},
- {'transaction-id': 3188, 'amount': 2663},
- {'transaction-id': 3366, 'amount': 2258},
- {'transaction-id': 3476, 'amount': 2371},
- {'transaction-id': 3618, 'amount': 2438},
- {'transaction-id': 3676, 'amount': 2610},
- {'transaction-id': 3741, 'amount': 2272},
- {'transaction-id': 3936, 'amount': 2432},
- {'transaction-id': 4231, 'amount': 2292},
- {'transaction-id': 4737, 'amount': 2699},
- {'transaction-id': 4922, 'amount': 2470},
- {'transaction-id': 4959, 'amount': 2338},
- {'transaction-id': 5190, 'amount': 2400},
- {'transaction-id': 5465, 'amount': 2511},
- {'transaction-id': 5471, 'amount': 2439},
- {'transaction-id': 5971, 'amount': 2520},
- {'transaction-id': 6072, 'amount': 2441},
- {'transaction-id': 6152, 'amount': 2604},
- {'transaction-id': 6154, 'amount': 2347},
- {'transaction-id': 6200, 'amount': 2224},
- {'transaction-id': 6314, 'amount': 2553},
- {'transaction-id': 6579, 'amount': 2474},
- {'transaction-id': 7142, 'amount': 2269},
- {'transaction-id': 7231, 'amount': 2446},
- {'transaction-id': 7281, 'amount': 2509},
- {'transaction-id': 7379, 'amount': 2657},
- {'transaction-id': 7776, 'amount': 2424},
- {'transaction-id': 7922, 'amount': 2577},
- {'transaction-id': 8319, 'amount': 2524},
- {'transaction-id': 8666, 'amount': 2489},
- {'transaction-id': 8873, 'amount': 2374},
- {'transaction-id': 9167, 'amount': 2301},
- {'transaction-id': 9197, 'amount': 2244},
- {'transaction-id': 9429, 'amount': 2249},
- {'transaction-id': 9695, 'amount': 2410}],
- [{'transaction-id': 1, 'amount': -311},
- {'transaction-id': 262, 'amount': -300},
- {'transaction-id': 766, 'amount': -400},
- {'transaction-id': 1607, 'amount': -355},
- {'transaction-id': 4148, 'amount': -254},
- {'transaction-id': 4507, 'amount': -302},
- {'transaction-id': 5692, 'amount': -250},
- {'transaction-id': 5861, 'amount': -294},
- {'transaction-id': 6351, 'amount': -355},
- {'transaction-id': 8661, 'amount': -374},
- {'transaction-id': 9132, 'amount': -384},
- {'transaction-id': 9321, 'amount': -337}])
- [15]:
- (js.filter(lambda record: record['name'] == 'Alice')
- .pluck('transactions')
- .flatten()
- .take(3))
- [15]:
- ({'transaction-id': 1535, 'amount': 335},
- {'transaction-id': 1792, 'amount': 350},
- {'transaction-id': 2554, 'amount': 367})
- [16]:
- (js.filter(lambda record: record['name'] == 'Alice')
- .pluck('transactions')
- .flatten()
- .pluck('amount')
- .take(3))
- [16]:
- (335, 350, 367)
- [17]:
- (js.filter(lambda record: record['name'] == 'Alice')
- .pluck('transactions')
- .flatten()
- .pluck('amount')
- .mean()
- .compute())
- [17]:
- 366.1473166946851
Groupby and Foldby
Often we want to group data by some function or key. We can do this either with the .groupby
method, which is straightforward but forces a full shuffle of the data (expensive) or with the harder-to-use but faster .foldby
method, which does a streaming combined groupby and reduction.
groupby
: Shuffles data so that all items with the same key are in the same key-value pair
foldby
: Walks through the data accumulating a result per key
Note: the full groupby is particularly bad. In actual workloads you would do well to use foldby
or switch to DataFrame
s if possible.
groupby
Groupby collects items in your collection so that all items with the same value under some function are collected together into a key-value pair.
- [18]:
- b = db.from_sequence(['Alice', 'Bob', 'Charlie', 'Dan', 'Edith', 'Frank'])
- b.groupby(len).compute() # names grouped by length
- [18]:
- [(7, ['Charlie']), (3, ['Bob', 'Dan']), (5, ['Alice', 'Edith', 'Frank'])]
- [19]:
- b = db.from_sequence(list(range(10)))
- b.groupby(lambda x: x % 2).compute()
- [19]:
- [(0, [0, 2, 4, 6, 8]), (1, [1, 3, 5, 7, 9])]
- [20]:
- b.groupby(lambda x: x % 2).starmap(lambda k, v: (k, max(v))).compute()
- [20]:
- [(0, 8), (1, 9)]
foldby
Foldby can be quite odd at first. It is similar to the following functions from other libraries:
toolz.reduceby</code> <[http://toolz.readthedocs.io/en/latest/streaming-analytics.html#streaming-split-apply-combine](https://toolz.readthedocs.io/en/latest/streaming-analytics.html#streaming-split-apply-combine)>
__
pyspark.RDD.combineByKey</code> <[http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/](https://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/)>
__
When using foldby
you provide
A key function on which to group elements
A binary operator such as you would pass to reduce
that you use to perform reduction per each group
A combine binary operator that can combine the results of two reduce
calls on different parts of your dataset.
Your reduction must be associative. It will happen in parallel in each of the partitions of your dataset. Then all of these intermediate results will be combined by the combine
binary operator.
- [21]:
- is_even = lambda x: x % 2
- b.foldby(is_even, binop=max, combine=max).compute()
- [21]:
- [(0, 8), (1, 9)]
Example with account data
We find the number of people with the same name.
- [22]:
- %%time
- # Warning, this one takes a while...
- result = js.groupby(lambda item: item['name']).starmap(lambda k, v: (k, len(v))).compute()
- print(sorted(result))
- [('Alice', 679), ('Bob', 400), ('Charlie', 299), ('Dan', 400), ('Edith', 250), ('Frank', 450), ('George', 347), ('Hannah', 344), ('Ingrid', 494), ('Jerry', 485), ('Kevin', 448), ('Laura', 600), ('Michael', 734), ('Norbert', 585), ('Oliver', 633), ('Patricia', 363), ('Quinn', 150), ('Ray', 310), ('Sarah', 500), ('Tim', 559), ('Ursula', 300), ('Victor', 598), ('Wendy', 268), ('Xavier', 1190), ('Yvonne', 395), ('Zelda', 432)]
- CPU times: user 4.06 s, sys: 406 ms, total: 4.47 s
- Wall time: 1min 12s
- [23]:
- %%time
- # This one is comparatively fast and produces the same result.
- from operator import add
- def incr(tot, _):
- return tot+1
-
- result = js.foldby(key='name',
- binop=incr,
- initial=0,
- combine=add,
- combine_initial=0).compute()
- print(sorted(result))
- [('Alice', 679), ('Bob', 400), ('Charlie', 299), ('Dan', 400), ('Edith', 250), ('Frank', 450), ('George', 347), ('Hannah', 344), ('Ingrid', 494), ('Jerry', 485), ('Kevin', 448), ('Laura', 600), ('Michael', 734), ('Norbert', 585), ('Oliver', 633), ('Patricia', 363), ('Quinn', 150), ('Ray', 310), ('Sarah', 500), ('Tim', 559), ('Ursula', 300), ('Victor', 598), ('Wendy', 268), ('Xavier', 1190), ('Yvonne', 395), ('Zelda', 432)]
- CPU times: user 164 ms, sys: 0 ns, total: 164 ms
- Wall time: 596 ms
Exercise: compute total amount per name
We want to groupby (or foldby) the name
key, then add up the all of the amounts for each name.
Steps
- Create a small function that, given a dictionary like
- {'name': 'Alice', 'transactions': [{'amount': 1, 'id': 123}, {'amount': 2, 'id': 456}]}
produces the sum of the amounts, e.g. 3
- Slightly change the binary operator of the
foldby
example above so that the binary operator doesn’t count the number of entries, but instead accumulates the sum of the amounts.
- [24]:
- # Your code here...
DataFrames
For the same reasons that Pandas is often faster than pure Python, dask.dataframe
can be faster than dask.bag
. We will work more with DataFrames later, but from for the bag point of view, they are frequently the end-point of the “messy” part of data ingestion—once the data can be made into a data-frame, then complex split-apply-combine logic will become much more straight-forward and efficient.
You can transform a bag with a simple tuple or flat dictionary structure into a dask.dataframe
with the to_dataframe
method.
- [25]:
- df1 = js.to_dataframe()
- df1.head()
- [25]:
id name transactions 0 0 Oliver [{'transaction-id': 233, 'amount': 137}, {'tra… 1 1 Oliver [{'transaction-id': 168, 'amount': 156}, {'tra… 2 3 Patricia [{'transaction-id': 640, 'amount': 401}, {'tra… 3 4 Laura [{'transaction-id': 255, 'amount': -864}, {'tr… 4 5 Alice [{'transaction-id': 1535, 'amount': 335}, {'tr…
This now looks like a well-defined DataFrame, and we can apply Pandas-like computations to it efficiently.
Using a Dask DataFrame, how long does it take to do our prior computation of numbers of people with the same name? It turns out that dask.dataframe.groupby()
beats dask.bag.groupby()
more than an order of magnitude; but it still cannot match dask.bag.foldby()
for this case.
- [26]:
- %time df1.groupby('name').id.count().compute().head()
- CPU times: user 235 ms, sys: 19.5 ms, total: 255 ms
- Wall time: 2.17 s
- [26]:
- name
- Alice 679
- Bob 400
- Charlie 299
- Dan 400
- Edith 250
- Name: id, dtype: int64
Denormalization
This DataFrame format is less-than-optimal because the transactions
column is filled with nested data so Pandas has to revert to object
dtype, which is quite slow in Pandas. Ideally we want to transform to a dataframe only after we have flattened our data so that each record is a single int
, string
, float
, etc..
- [27]:
- def denormalize(record):
- # returns a list for every nested item, each transaction of each person
- return [{'id': record['id'],
- 'name': record['name'],
- 'amount': transaction['amount'],
- 'transaction-id': transaction['transaction-id']}
- for transaction in record['transactions']]
-
- transactions = js.map(denormalize).flatten()
- transactions.take(3)
- [27]:
- ({'id': 0, 'name': 'Oliver', 'amount': 137, 'transaction-id': 233},
- {'id': 0, 'name': 'Oliver', 'amount': 73, 'transaction-id': 459},
- {'id': 0, 'name': 'Oliver', 'amount': 112, 'transaction-id': 2030})
- [28]:
- df = transactions.to_dataframe()
- df.head()
- [28]:
id name amount transaction-id 0 0 Oliver 137 233 1 0 Oliver 73 459 2 0 Oliver 112 2030 3 0 Oliver 89 2769 4 0 Oliver 59 3027
- [29]:
- %%time
- # number of transactions per name
- # note that the time here includes the data load and ingestion
- df.groupby('name')['transaction-id'].count().compute()
- CPU times: user 231 ms, sys: 14.2 ms, total: 245 ms
- Wall time: 1.45 s
- [29]:
- name
- Alice 38665
- Bob 15852
- Charlie 17154
- Dan 14775
- Edith 10741
- Frank 33950
- George 11536
- Hannah 8527
- Ingrid 14609
- Jerry 13311
- Kevin 14312
- Laura 13519
- Michael 27488
- Norbert 14884
- Oliver 18405
- Patricia 13752
- Quinn 9456
- Ray 10627
- Sarah 31099
- Tim 26600
- Ursula 10539
- Victor 33491
- Wendy 12364
- Xavier 50616
- Yvonne 16473
- Zelda 17255
- Name: transaction-id, dtype: int64
Limitations
Bags provide very general computation (any Python function.) This generality comes at cost. Bags have the following known limitations
Bag operations tend to be slower than array/dataframe computations in the same way that Python tends to be slower than NumPy/Pandas
Bag.groupby
is slow. You should try to use Bag.foldby
if possible. Using Bag.foldby
requires more thought. Even better, consider creating a normalised dataframe.
Learn More
Shutdown
- [30]:
- client.shutdown()
- [ ]:
-