You can run this notebook in a live sessionBinder or view it on Github.

Data Storage

Efficient storage can dramatically improve performance, particularly when operating repeatedly from disk.

Decompressing text and parsing CSV files is expensive. One of the most effective strategies with medium data is to use a binary storage format like HDF5. Often the performance gains from doing this is sufficient so that you can switch back to using Pandas again instead of using dask.dataframe.

In this section we’ll learn how to efficiently arrange and store your datasets in on-disk binary formats. We’ll use the following:

Main Take-aways

  • Storage formats affect performance by an order of magnitude

  • Text data will keep even a fast format like HDF5 slow

  • A combination of binary formats, column storage, and partitioned data turns one second wait times into 80ms wait times.

Create data

  1. [1]:
  1. %run prep.py -d accounts

Read CSV

First we read our csv data as before.

CSV and other text-based file formats are the most common storage for data from many sources, because they require minimal pre-processing, can be written line-by-line and are human-readable. Since Pandas’ read_csv is well-optimized, CSVs are a reasonable input, but far from optimized, since reading required extensive text parsing.

  1. [2]:
  1. import os
  2. filename = os.path.join('data', 'accounts.*.csv')
  3. filename
  1. [2]:
  1. 'data/accounts.*.csv'
  1. [3]:
  1. import dask.dataframe as dd
  2. df_csv = dd.read_csv(filename)
  3. df_csv.head()
  1. [3]:
idnamesamount
091Kevin575
110Charlie421
238Ursula2056
398Frank2818
495Ingrid197

Write to HDF5

HDF5 and netCDF are binary array formats very commonly used in the scientific realm.

Pandas contains a specialized HDF5 format, HDFStore. The dd.DataFrame.to_hdf method works exactly like the pd.DataFrame.to_hdf method.

  1. [4]:
  1. target = os.path.join('data', 'accounts.h5')
  2. target
  1. [4]:
  1. 'data/accounts.h5'
  1. [5]:
  1. # convert to binary format, takes some time up-front
  2. %time df_csv.to_hdf(target, '/data')
  1. CPU times: user 151 ms, sys: 15.9 ms, total: 167 ms
  2. Wall time: 170 ms
  1. [5]:
  1. ['data/accounts.h5', 'data/accounts.h5', 'data/accounts.h5']
  1. [6]:
  1. # same data as before
  2. df_hdf = dd.read_hdf(target, '/data')
  3. df_hdf.head()
  1. [6]:
idnamesamount
091Kevin575
110Charlie421
238Ursula2056
398Frank2818
495Ingrid197

Compare CSV to HDF5 speeds

We do a simple computation that requires reading a column of our dataset and compare performance between CSV files and our newly created HDF5 file. Which do you expect to be faster?

  1. [7]:
  1. %time df_csv.amount.sum().compute()
  1. CPU times: user 32.6 ms, sys: 54 µs, total: 32.7 ms
  2. Wall time: 26.6 ms
  1. [7]:
  1. 43699504
  1. [8]:
  1. %time df_hdf.amount.sum().compute()
  1. CPU times: user 28.2 ms, sys: 3.46 ms, total: 31.7 ms
  2. Wall time: 31.2 ms
  1. [8]:
  1. 43699504

Sadly they are about the same, or perhaps even slower.

The culprit here is names column, which is of object dtype and thus hard to store efficiently. There are two problems here:

  • How do we store text data like names efficiently on disk?

  • Why did we have to read the names column when all we wanted was amount

1. Store text efficiently with categoricals

We can use Pandas categoricals to replace our object dtypes with a numerical representation. This takes a bit more time up front, but results in better performance.

More on categoricals at the pandas docs and this blogpost.

  1. [9]:
  1. # Categorize data, then store in HDFStore
  2. %time df_hdf.categorize(columns=['names']).to_hdf(target, '/data2')
  1. CPU times: user 109 ms, sys: 0 ns, total: 109 ms
  2. Wall time: 108 ms
  1. [9]:
  1. ['data/accounts.h5']
  1. [10]:
  1. # It looks the same
  2. df_hdf = dd.read_hdf(target, '/data2')
  3. df_hdf.head()
  1. [10]:
idnamesamount
091Kevin575
110Charlie421
238Ursula2056
398Frank2818
495Ingrid197
  1. [11]:
  1. # But loads more quickly
  2. %time df_hdf.amount.sum().compute()
  1. CPU times: user 20.8 ms, sys: 4.11 ms, total: 24.9 ms
  2. Wall time: 24.3 ms
  1. [11]:
  1. 43699504

This is now definitely faster than before. This tells us that it’s not only the file type that we use but also how we represent our variables that influences storage performance.

How does the performance of reading depend on the scheduler we use? You can try this with threaded, processes and distributed.

However this can still be better. We had to read all of the columns (names and amount) in order to compute the sum of one (amount). We’ll improve further on this with parquet, an on-disk column-store. First though we learn about how to set an index in a dask.dataframe.

Exercise

fastparquet is a library for interacting with parquet-format files, which are a very common format in the Big Data ecosystem, and used by tools such as Hadoop, Spark and Impala.

  1. [12]:
  1. target = os.path.join('data', 'accounts.parquet')
  2. df_csv.categorize(columns=['names']).to_parquet(target, storage_options={"has_nulls": True}, engine="fastparquet")

Investigate the file structure in the resultant new directory - what do you suppose those files are for?

to_parquet comes with many options, such as compression, whether to explicitly write NULLs information (not necessary in this case), and how to encode strings. You can experiment with these, to see what effect they have on the file size and the processing times, below.

  1. [13]:
  1. ls -l data/accounts.parquet/
  1. total 752
  2. -rw-rw-r-- 1 travis travis 723 Jan 7 00:58 _common_metadata
  3. -rw-rw-r-- 1 travis travis 1932 Jan 7 00:58 _metadata
  4. -rw-rw-r-- 1 travis travis 251801 Jan 7 00:58 part.0.parquet
  5. -rw-rw-r-- 1 travis travis 251801 Jan 7 00:58 part.1.parquet
  6. -rw-rw-r-- 1 travis travis 251801 Jan 7 00:58 part.2.parquet
  1. [14]:
  1. df_p = dd.read_parquet(target)
  2. # note that column names shows the type of the values - we could
  3. # choose to load as a categorical column or not.
  4. df_p.dtypes
  1. [14]:
  1. id int64
  2. names category
  3. amount int64
  4. dtype: object

Rerun the sum computation above for this version of the data, and time how long it takes. You may want to try this more than once - it is common for many libraries to do various setup work when called for the first time.

  1. [15]:
  1. %time df_p.amount.sum().compute()
  1. CPU times: user 179 ms, sys: 4.18 ms, total: 183 ms
  2. Wall time: 180 ms
  1. [15]:
  1. 43699504

When archiving data, it is common to sort and partition by a column with unique identifiers, to facilitate fast look-ups later. For this data, that column is id. Time how long it takes to retrieve the rows corresponding to id==100 from the raw CSV, from HDF5 and parquet versions, and finally from a new parquet version written after applying set_index('id').

  1. [16]:
  1. # df_p.set_index('id').to_parquet(...)

Remote files

Dask can access various cloud- and cluster-oriented data storage services such as Amazon S3 or HDFS

Advantages: * scalable, secure storage

Disadvantages: * network speed becomes bottleneck

The way to set up dataframes (and other collections) remains very similar to before. Note that the data here is available anonymously, but in general an extra parameter storage_options= can be passed with further details about how to interact with the remote storage.

  1. taxi = dd.read_csv('s3://nyc-tlc/trip data/yellow_tripdata_2015-*.csv',
  2. storage_options={'anon': True})

Warning: operations over the Internet can take a long time to run. Such operations work really well in a cloud clustered set-up, e.g., amazon EC2 machines reading from S3 or Google compute machines reading from GCS.