Efficiency
Parallel computing done well is responsive and rewarding. However, severalspeed-bumps can get in the way. This section describes common ways to ensureperformance.
Leave data on the cluster
Wait as long as possible to gather data locally. If you want to ask a questionof a large piece of data on the cluster it is often faster to submit a functiononto that data then to bring the data down to your local computer.
For example if we have a numpy array on the cluster and we want to know itsshape we might choose one of the following options:
- Slow: Gather the numpy array to the local process, access the
.shape
attribute - Fast: Send a lambda function up to the cluster to compute the shape
- >>> x = client.submit(np.random.random, (1000, 1000))
- >>> type(x)
- Future
Slow
- >>> x.result().shape # Slow from lots of data transfer
- (1000, 1000)
Fast
- >>> client.submit(lambda a: a.shape, x).result() # fast
- (1000, 1000)
Use larger tasks
The scheduler adds about one millisecond of overhead per task or Futureobject. While this may sound fast it’s quite slow if you run a billion tasks.If your functions run faster than 100ms or so then you might not see anyspeedup from using distributed computing.
A common solution is to batch your input into larger chunks.
Slow
- >>> futures = client.map(f, seq)
- >>> len(futures) # avoid large numbers of futures
- 1000000000
Fast
- >>> def f_many(chunk):
- ... return [f(x) for x in chunk]
- >>> from toolz import partition_all
- >>> chunks = partition_all(1000000, seq) # Collect into groups of size 1000
- >>> futures = client.map(f_many, chunks)
- >>> len(futures) # Compute on larger pieces of your data at once
- 1000
Adjust between Threads and Processes
By default a single Worker
runs many computations in parallel using as manythreads as your compute node has cores. When using pure Python functionsthis may not be optimal and you may instead want to run several separateworker processes on each node, each using one thread. When configuring yourcluster you may want to use the options to the dask-worker
executable asfollows:
- $ dask-worker ip:port --nprocs 8 --nthreads 1
Note that if you’re primarily using NumPy, Pandas, SciPy, Scikit Learn, Numba,or other C/Fortran/LLVM/Cython-accelerated libraries then this is not an issuefor you. Your code is likely optimal for use with multi-threading.
Don’t go distributed
Consider the dask and concurrent.futures modules, which have similar APIs todistributed but operate on a single machine. It may be that your problemperforms well enough on a laptop or large workstation.
Consider accelerating your code through other means than parallelism. Betteralgorithms, data structures, storage formats, or just a little bit ofC/Fortran/Numba code might be enough to give you the 10x speed boost thatyou’re looking for. Parallelism and distributed computing are expensive waysto accelerate your application.