Best Practices
It is easy to get started with Dask’s APIs, but using them well requires someexperience. This page contains suggestions for best practices, and includessolutions to common problems.
This document specifically focuses on best practices that are shared among allof the Dask APIs. Readers may first want to investigate one of theAPI-specific Best Practices documents first.
Start Small
Parallelism brings extra complexity and overhead.Sometimes it’s necessary for larger problems, but often it’s not.Before adding a parallel computing system like Dask to your workload you maywant to first try some alternatives:
- Use better algorithms or data structures: NumPy, Pandas, Scikit-Learnmay have faster functions for what you’re trying to do. It may be worthconsulting with an expert or reading through their docs again to find abetter pre-built algorithm.
- Better file formats: Efficient binary formats that support randomaccess can often help you manage larger-than-memory datasets efficiently andsimply. See the Store Data Efficiently section below.
- Compiled code: Compiling your Python code with Numba or Cython mightmake parallelism unnecessary. Or you might use the multi-core parallelismavailable within those libraries.
- Sampling: Even if you have a lot of data, there might not be muchadvantage from using all of it. By sampling intelligently you might be ableto derive the same insight from a much more manageable subset.
- Profile: If you’re trying to speed up slow code it’s important thatyou first understand why it is slow. Modest time investments in profilingyour code can help you to identify what is slowing you down. Thisinformation can help you make better decisions about if parallelism is likelyto help, or if other approaches are likely to be more effective.
Use The Dashboard
Dask’s dashboard helps you to understand the state of your workers.This information can help to guide you to efficient solutions.In parallel and distributed computing there are new costs to be aware of and soyour old intuition may no longer be true. Working with the dashboard can helpyou relearn about what is fast and slow and how to deal with it.
See Documentation on Dask’s dashboard for moreinformation.
Avoid Very Large Partitions
Your chunks of data should be small enough so that many of them fit in aworker’s available memory at once. You often control this when you selectpartition size in Dask DataFrame or chunk size in Dask Array.
Dask will likely manipulate as many chunks in parallel on one machine as youhave cores on that machine. So if you have 1 GB chunks and tencores, then Dask is likely to use at least 10 GB of memory. Additionally,it’s common for Dask to have 2-3 times as many chunks available to work on sothat it always has something to work on.
If you have a machine with 100 GB and 10 cores, then you might want to choosechunks in the 1GB range. You have space for ten chunks per core which givesDask a healthy margin, without having tasks that are too small
Note that you also want to avoid chunk sizes that are too small. See the nextsection for details.
Avoid Very Large Graphs
Dask workloads are composed of tasks.A task is a Python function, like np.sum
applied onto a Python object,like a Pandas dataframe or NumPy array. If you are working with Daskcollections with many partitions, then every operation you do, like x + 1
likely generates many tasks, at least as many as partitions in your collection.
Every task comes with some overhead. This is somewhere between 200us and 1ms.If you have a computation with thousands of tasks this is fine, there will beabout a second of overhead, and that may not trouble you.
However when you have very large graphs with millions of tasks then this maybecome troublesome, both because overhead is now in the 10 minutes to hoursrange, and also because the overhead of dealing with such a large graph canstart to overwhelm the scheduler.
There are a few things you can do to address this:
Build smaller graphs. You can do this by …
- Increasing your chunk size: If you have a 1000 GB of data and are using10 MB chunks, then you have 100,000 partitions. Every operation on sucha collection will generate at least 100,000 tasks.
However if you increase your chunksize to 1 GB or even a few GB then youreduce the overhead by orders of magnitude. This requires that yourworkers have much more than 1 GB of memory, but that’s typical for largerworkloads.
- Fusing operations together: Dask will do a bit of this on its own, but youcan help it. If you have a very complex operation with dozens ofsub-operations, maybe you can pack that into a single Python functionand use a function like
da.map_blocks
ordd.map_partitions
.
In general, the more administrative work you can move into your functionsthe better. That way the Dask scheduler doesn’t need to think about allof the fine-grained operations.
- Breaking up your computation: For very large workloads you may also want totry sending smaller chunks to Dask at a time. For example if you’reprocessing a petabyte of data but find that Dask is only happy with 100TB, maybe you can break up your computation into ten pieces and submitthem one after the other.
Learn Techniques For Customization
The high level Dask collections (array, dataframe, bag) include commonoperations that follow standard Python APIs from NumPy and Pandas.However, many Python workloads are complex and may require operations that arenot included in these high level APIs.
Fortunately, there are many options to support custom workloads:
- All collections have a
map_partitions
ormap_blocks
function, thatapplies a user provided function across every Pandas dataframe or NumPy arrayin the collection. Because Dask collections are made up of normal Pythonobjects, it’s often quite easy to map custom functions across partitions of adataset without much modification.
- df.map_partitions(my_custom_func)
- More complex
map_*
functions. Sometimes your custom behavior isn’tembarrassingly parallel, but requires more advanced communication. Forexample maybe you need to communicate a little bit of information from onepartition to the next, or maybe you want to build a custom aggregation.
Dask collections include methods for these as well.
- For even more complex workloads you can convert your collections intoindividual blocks, and arrange those blocks as you like using Dask Delayed.There is usually a
to_delayed
method on every collection.
map_partitions (func, *args[, meta, …]) | Apply Python function on each DataFrame partition. |
rolling.map_overlap (func, df, before, after, …) | Apply a function to each partition, sharing rows with adjacent partitions. |
groupby.Aggregation (name, chunk, agg[, finalize]) | User defined groupby-aggregation. |
blockwise (func, out_ind, *args[, name, …]) | Tensor operation: Generalized inner and outer products |
map_blocks (func, *args[, name, token, …]) | Map a function across all blocks of a dask array. |
map_overlap (x, func, depth[, boundary, trim]) | Map a function over blocks of the array with some overlap |
reduction (x, chunk, aggregate[, axis, …]) | General version of reductions |
Stop Using Dask When No Longer Needed
In many workloads it is common to use Dask to read in a large amount of data,reduce it down, and then iterate on a much smaller amount of data. For thislatter stage on smaller data it may make sense to stop using Dask, and startusing normal Python again.
- df = dd.read_parquet("lots-of-data-*.parquet")
- df = df.groupby('name').mean() # reduce data significantly
- df = df.compute() # continue on with Pandas/NumPy
Persist When You Can
Accessing data from RAM is often much faster than accessing it from disk.Once you have your dataset in a clean state that both:
- Fits in memory
- Is clean enough that you will want to try many different analysesThen it is a good time to persist your data in RAM
- df = dd.read_parquet("lots-of-data-*.parquet")
- df = df.fillna(...) # clean up things lazily
- df = df[df.name == 'Alice'] # get down to a more reasonable size
- df = df.persist() # trigger computation, persist in distributed RAM
Note that this is only relevant if you are on a distributed machine (otherwise,as mentioned above, you should probably continue on without Dask).
Store Data Efficiently
As your ability to compute increases you will likely find that data access andI/O take up a larger portion of your total time. Additionally, parallelcomputing will often add new constraints to how your store your data,particularly around providing random access to blocks of your data that are inline with how you plan to compute on it.
For example …
- For compression you’ll probably find that you drop gzip and bz2, and embracenewer systems like lz4, snappy, and Z-Standard that provide betterperformance and random access.
- For storage formats you may find that you want self-describing formats thatare optimized for random access, metadata storage, and binary encoding likeParquet, ORC, Zarr, HDF5, GeoTIFF and so on
- When working on the cloud you may find that some older formats like HDF5 maynot work well
- You may want to partition or chunk your data in ways that align well tocommon queries. In Dask DataFrame this might mean choosing a column tosort by for fast selection and joins. For Dask dataframe this might meanchoosing chunk sizes that are aligned with your access patterns andalgorithms.
Processes and Threads
If you’re doing mostly numeric work with Numpy, Pandas, Scikit-Learn, Numba,and other libraries that release the GIL, then use mostly threads. If you’redoing work on text data or Python collections like lists and dicts then usemostly processes.
If you’re on larger machines with a high thread count (greater than 10), thenyou should probably split things up into at least a few processes regardless.Python can be highly productive with 10 threads per process with numeric work,but not 50 threads.
For more information on threads, processes, and how to configure them in Dask, seethe scheduler documentation.
Load Data with Dask
If you need to work with large Python objects, then please let Dask createthem. A common anti-pattern we see is people creating large Python objectsoutside of Dask, then giving those objects to Dask and asking it to manage them.This works, but means that Dask needs to move around these very large objectswith its metadata, rather than as normal Dask-controlled results.
Here are some common patterns to avoid and nicer alternatives:
DataFrames
- # Don't
- ddf = ... a dask dataframe ...
- for fn in filenames:
- df = pandas.read_csv(fn) # Read locally with Pandas
- ddf = ddf.append(df) # Give to Dask
- # Do
- ddf = dd.read_csv(filenames)
Arrays
- # Don't
- f = h5py.File(...)
- x = np.asarray(f["x"]) # Get data as a NumPy array locally
- x = da.from_array(x) # Hand NumPy array to Dask
- # Do
- f = h5py.File(...)
- x = da.from_array(f["x"]) # Let Dask do the reading
Delayed
- # Don't
- @dask.delayed
- def process(a, b):
- ...
- df = pandas.read_csv("some-large-file.csv") # Create large object locally
- results = []
- for item in L:
- result = process(item, df) # include df in every delayed call
- results.append(result)
- # Do
- @dask.delayed
- def process(a, b):
- ...
- df = dask.delayed(pandas.read_csv)("some-large-file.csv") # Let Dask build object
- results = []
- for item in L:
- result = process(item, df) # include pointer to df in every delayed call
- results.append(result)
Avoid calling compute repeatedly
Compute related results with shared computations in a single dask.compute()
call
- # Don't repeatedly call compute
- df = dd.read_csv("...")
- xmin = df.x.min().compute()
- xmax = df.x.max().compute()
- # Do compute multiple results at the same time
- df = dd.read_csv("...")
- xmin, xmax = dask.compute(df.x.min(), df.x.max())
This allows Dask to compute the shared parts of the computation (like thedd.read_csv
call above) only once, rather than once per compute
call.