You can run this notebook in a live session or view it on Github.
Distributed, Advanced
Distributed futures
- [1]:
- from dask.distributed import Client
- c = Client(n_workers=4)
- c.cluster
In chapter Distributed, we showed that executing a calculation (created using delayed) with the distributed executor is identical to any other executor. However, we now have access to additional functionality, and control over what data is held in memory.
To begin, the futures
interface (derived from the built-in concurrent.futures
) allow map-reduce like functionality. We can submit individual functions for evaluation with one set of inputs, or evaluated over a sequence of inputs with submit()
and map()
. Notice that the call returns immediately, giving one or more futures, whose status begins as “pending” and later becomes “finished”. There is no blocking of the local Python session.
Here is the simplest example of submit
in action:
- [2]:
- def inc(x):
- return x + 1
- fut = c.submit(inc, 1)
- fut
- [2]:
Future: incstatus: pending, key: inc-ae0ff1cb6f5c736d2f30b1b1b13931fb
We can re-execute the following cell as often as we want as a way to poll the status of the future. This could of course be done in a loop, pausing for a short time on each iteration. We could continue with our work, or view a progressbar of work still going on, or force a wait until the future is ready.
In the meantime, the status
dashboard (link above next to the Cluster widget) has gained a new element in the task stream, indicating that inc()
has completed, and the progress section at the problem shows one task complete and held in memory.
- [3]:
- fut
- [3]:
Future: incstatus: finished, type: builtins.int, key: inc-ae0ff1cb6f5c736d2f30b1b1b13931fb
Possible alternatives you could investigate:
- from dask.distributed import wait, progress
- progress(fut)
would show a progress bar in this notebook, rather than having to go to the dashboard. This progress bar is also asynchronous, and doesn’t block the execution of other code in the meanwhile.
- wait(fut)
would block and force the notebook to wait until the computation pointed to by fut
was done. However, note that the result of inc()
is sitting in the cluster, it would take no time to execute the computation now, because Dask notices that we are asking for the result of a computation it already knows about. More on this later.
- [4]:
- # grab the information back - this blocks if fut is not ready
- c.gather(fut)
- # equivalent action when only considering a single future
- # fut.result()
- [4]:
- 2
Here we see an alternative way to execute work on the cluster: when you submit or map with the inputs as futures, the computation moves to the data rather than the other way around, and the client, in the local Python session, need never see the intermediate values. This is similar to building the graph using delayed, and indeed, delayed can be used in conjunction with futures. Here we use the delayed object total
from before.
- [5]:
- # Some trivial work that takes time
- # repeated from the Distributed chapter.
- from dask import delayed
- import time
- def inc(x):
- time.sleep(5)
- return x + 1
- def dec(x):
- time.sleep(3)
- return x - 1
- def add(x, y):
- time.sleep(7)
- return x + y
- x = delayed(inc)(1)
- y = delayed(dec)(2)
- total = delayed(add)(x, y)
- [6]:
- # notice the difference from total.compute()
- # notice that this cell completes immediately
- fut = c.compute(total)
- [7]:
- c.gather(fut) # waits until result is ready
- [7]:
- 3
Client.submit
submit
takes a function and arguments, pushes these to the cluster, returning a Future representing the result to be computed. The function is passed to a worker process for evaluation. Note that this cell returns immediately, while computation may still be ongoing on the cluster.
- [8]:
- fut = c.submit(inc, 1)
- fut
- [8]:
Future: incstatus: pending, key: inc-22a0514160481fa01cbf9621a2db3fc1
This looks a lot like doing compute()
, above, except now we are passing the function and arguments directly to the cluster. To anyone used to concurrent.futures
, this will look familiar. This new fut
behaves the same way as the one above. Note that we have now over-written the previous definition of fut
, which will get garbage-collected, and, as a result, that previous result is released by the cluster
Exercise: Rebuild the above delayed computation using Client.submit instead
The arguments passed to submit
can be futures from other submit operations or delayed objects. The former, in particular, demonstrated the concept of moving the computation to the data which is one of the most powerful elements of programming with Dask.
- [9]:
- # Your code here
- [10]:
- x = c.submit(inc, 1)
- y = c.submit(dec, 2)
- total = c.submit(add, x, y)
- print(total) # This is still a future
- c.gather(total) # This blocks until the computation has finished
- <Future: pending, key: add-26587b4557bda00a6b4240998c9537dc>
- [10]:
- 3
Each futures represents a result held, or being evaluated by the cluster. Thus we can control caching of intermediate values - when a future is no longer referenced, its value is forgotten. In the solution, above, futures are held for each of the function calls. These results would not need to be re-evaluated if we chose to submit more work that needed them.
We can explicitly pass data from our local session into the cluster using scatter()
, but usually better is to construct functions that do the loading of data within the workers themselves, so that there is no need to serialise and communicate the data. Most of the loading functions within Dask, sudh as dd.read_csv
, work this way. Similarly, we normally don’t want to gather()
results that are too big in memory.
The full API of the distributed scheduler gives details of interacting with the cluster, which remember, can be on your local machine or possibly on a massive computational resource.
The futures API offers a work submission style that can easily emulate the map/reduce paradigm (see c.map()
) that may be familiar to many people. The intermediate results, represented by futures, can be passed to new tasks without having to bring the pull locally from the cluster, and new work can be assigned to work on the output of previous jobs that haven’t even begun yet.
Generally, any Dask operation that is executed using .compute()
can be submitted for asynchronous execution using c.compute()
instead, and this applies to all collections. Here is an example with the calculation previously seen in the Bag chapter. We have replaced the .compute()
method there with the distributed client version, so, again, we could continue to submit more work (perhaps based on the result of the calculation), or, in the next cell, follow the progress of thecomputation. A similar progress-bar appears in the monitoring UI page.
- [11]:
- %run prep.py -d accounts
- [12]:
- import dask.bag as db
- import os
- import json
- filename = os.path.join('data', 'accounts.*.json.gz')
- lines = db.read_text(filename)
- js = lines.map(json.loads)
- f = c.compute(js.filter(lambda record: record['name'] == 'Alice')
- .pluck('transactions')
- .flatten()
- .pluck('amount')
- .mean())
- [13]:
- from dask.distributed import progress
- # note that progress must be the last line of a cell
- # in order to show up
- progress(f)
- [14]:
- # get result.
- c.gather(f)
- [14]:
- 366.1473166946851
- [15]:
- # release values by deleting the futures
- del f, fut, x, y, total
Persist
Considering which data should be loaded by the workers, as opposed to passed, and which intermediate values to persist in worker memory, will in many cases determine the computation efficiency of a process.
In the example here, we repeat a calculation from the Array chapter - notice that each call to compute()
is roughly the same speed, because the loading of the data is included every time.
- [16]:
- %run prep.py -d random
- [17]:
- import h5py
- import os
- f = h5py.File(os.path.join('data', 'random.hdf5'), mode='r')
- dset = f['/x']
- import dask.array as da
- x = da.from_array(dset, chunks=(1000000,))
- %time x.sum().compute()
- %time x.sum().compute()
- CPU times: user 43.9 ms, sys: 678 µs, total: 44.6 ms
- Wall time: 584 ms
- CPU times: user 20.4 ms, sys: 0 ns, total: 20.4 ms
- Wall time: 57.4 ms
- [17]:
- 4999958.5
If, instead, we persist the data to RAM up front (this takes a few seconds to complete - we could wait()
on this process), then further computations will be much faster.
- [18]:
- # changes x from a set of delayed prescriptions
- # to a set of futures pointing to data in RAM
- # See this on the UI dashboard.
- x = c.persist(x)
- [19]:
- %time x.sum().compute()
- %time x.sum().compute()
- CPU times: user 15.4 ms, sys: 1.19 ms, total: 16.5 ms
- Wall time: 37.1 ms
- CPU times: user 11 ms, sys: 496 µs, total: 11.5 ms
- Wall time: 27.1 ms
- [19]:
- 4999958.5
Naturally, persisting every intermediate along the way is a bad idea, because this will tend to fill up all available RAM and make the whole system slow (or break!). The ideal persist point is often at the end of a set of data cleaning steps, when the data is in a form which will get queried often.
Exercise: how is the memory associated with x
released, once we know we are done with it?
- [ ]:
Asynchronous computation
One benefit of using the futures API is that you can have dynamic computations that adjust as things progress. Here we implement a simple naive search by looping through results as they come in, and submit new points to compute as others are still running.
Watching the diagnostics dashboard as this runs you can see computations are being concurrently run while more are being submitted. This flexibility can be useful for parallel algorithms that require some level of synchronization.
Lets perform a very simple minimization using dynamic programming. The function of interest is known as Rosenbrock:
- [20]:
- # a simple function with interesting minima
- import time
- def rosenbrock(point):
- """Compute the rosenbrock function and return the point and result"""
- time.sleep(0.1)
- score = (1 - point[0])**2 + 2 * (point[1] - point[0]**2)**2
- return point, score
Initial setup, including creating a graphical figure. We use Bokeh for this, which allows for dynamic update of the figure as results come in.
- [21]:
- from bokeh.io import output_notebook, push_notebook
- from bokeh.models.sources import ColumnDataSource
- from bokeh.plotting import figure, show
- import numpy as np
- output_notebook()
- # set up plot background
- N = 500
- x = np.linspace(-5, 5, N)
- y = np.linspace(-5, 5, N)
- xx, yy = np.meshgrid(x, y)
- d = (1 - xx)**2 + 2 * (yy - xx**2)**2
- d = np.log(d)
- p = figure(x_range=(-5, 5), y_range=(-5, 5))
- p.image(image=[d], x=-5, y=-5, dw=10, dh=10, palette="Spectral11");
We start off with a point at (0, 0), and randomly scatter test points around it. Each evaluation takes ~100ms, and as result come in, we test to see if we have a new best point, and choose random points around that new best point, as the search box shrinks.
We print the function value and current best location each time we have a new best value.
- [22]:
- from dask.distributed import as_completed
- from random import uniform
- scale = 5 # Intial random perturbation scale
- best_point = (0, 0) # Initial guess
- best_score = float('inf') # Best score so far
- startx = [uniform(-scale, scale) for _ in range(10)]
- starty = [uniform(-scale, scale) for _ in range(10)]
- # set up plot
- source = ColumnDataSource({'x': startx, 'y': starty, 'c': ['grey'] * 10})
- p.circle(source=source, x='x', y='y', color='c')
- t = show(p, notebook_handle=True)
- # initial 10 random points
- futures = [c.submit(rosenbrock, (x, y)) for x, y in zip(startx, starty)]
- iterator = as_completed(futures)
- for res in iterator:
- # take a completed point, is it an improvement?
- point, score = res.result()
- if score < best_score:
- best_score, best_point = score, point
- print(score, point)
- x, y = best_point
- newx, newy = (x + uniform(-scale, scale), y + uniform(-scale, scale))
- # update plot
- source.stream({'x': [newx], 'y': [newy], 'c': ['grey']}, rollover=20)
- push_notebook(t)
- # add new point, dynamically, to work on the cluster
- new_point = c.submit(rosenbrock, (newx, newy))
- iterator.add(new_point) # Start tracking new task as well
- # Narrow search and consider stopping
- scale *= 0.99
- if scale < 0.001:
- break
- point
- 74.93711105326744 (3.1804267222660734, 4.191312553860941)
- 5.936765507793201 (0.07648850107177019, -1.5884976548640406)
- 2.3220176439234574 (2.0605833036517778, 5.019689760270688)
- 1.2259058712939275 (1.6168220014614167, 1.9639455253890254)
- 1.032141885376375 (1.850011873489775, 3.029083832231845)
- 0.5083173240255578 (0.8317613269354474, 0.20192228523222555)
- 0.16621353420211857 (0.8794986012642256, 0.4981154048277392)
- 0.034256783678307944 (1.0878654574387765, 1.0682634827239947)
- 0.028571114773177884 (0.8805835594379803, 0.6908377234629227)
- 0.01246082086143249 (0.9046150125944825, 0.8593315223441493)
- 0.0075807834367677684 (0.9284895052640721, 0.8269713108218659)
- 0.0074241377467670535 (1.0831178153158711, 1.157088571629004)
- 0.004658390857685124 (0.9386053300670536, 0.9020641464480152)
- 0.0020012106624380985 (0.9941558605736986, 0.9569846195832853)
- 0.0010732017735786189 (1.0327082789065054, 1.067784515770591)
- 0.0010186955721594288 (0.998245693143468, 1.0190290814667924)
- 8.022842518796153e-05 (0.9913813543548697, 0.9845614277044987)
- 4.169487967088062e-05 (1.0035115883208212, 1.0108671933676963)
- 2.0723130781258908e-05 (1.0007607275197303, 0.9983483588710113)
- 2.4641245598785283e-06 (0.9987365212002238, 0.9968159484564413)
- 1.196250015721816e-06 (0.9999864543023196, 1.0007462348932696)
- 3.2155568348178984e-07 (0.9996067754290375, 0.9989248022816197)
- 9.91686055015747e-08 (0.9998038971759329, 0.9994336026616896)
- [22]:
- (1.0003707378202629, 1.0003815059421843)
Debugging
When something goes wrong in a distributed job, it is hard to figure out what the problem was and what to do about it. When a task raises an exception, the exception will show up when that result, or other result that depend upon it, is gathered.
Consider the following delayed calculation to be computed by the cluster. As usual, we get back a future, which the cluster is working on to compute (this happens very slowly for the trivial procedure).
- [23]:
- @delayeddef ratio(a, b): return a // b
@delayeddef summation(a): return sum(a)
ina = [5, 25, 30]inb = [5, 5, 6]out = summation([ratio(a, b) for (a, b) in zip(ina, inb)])f = c.compute(out)f
- [23]:
Future: summationstatus: pending, key: summation-d5848cc6-8e5c-4a28-86fd-1efbc91b2e82
We only get to know what happened when we gather the result (this is also true for out.compute()
, except we could not have done other stuff in the meantime). For the first set of inputs, it works fine.
- [24]:
- c.gather(f)
- [24]:
- 11
But if we introduce bad input, an exception is raised. The exception happens in ratio
, but only comes to our attention when calculating summation
.
- [25]:
- ina = [5, 25, 30]
- inb = [5, 0, 6]
- out = summation([ratio(a, b) for (a, b) in zip(ina, inb)])
- f = c.compute(out)
- c.gather(f)
- ---------------------------------------------------------------------------
- ZeroDivisionError Traceback (most recent call last)
- <ipython-input-25-2b82b25e9c9c> in <module>
- 3 out = summation([ratio(a, b) for (a, b) in zip(ina, inb)])
- 4 f = c.compute(out)
- ----> 5 c.gather(f)
- ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
- 1870 direct=direct,
- 1871 local_worker=local_worker,
- -> 1872 asynchronous=asynchronous,
- 1873 )
- 1874
- ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
- 765 else:
- 766 return sync(
- --> 767 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
- 768 )
- 769
- ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
- 332 if error[0]:
- 333 typ, exc, tb = error[0]
- --> 334 raise exc.with_traceback(tb)
- 335 else:
- 336 return result[0]
- ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/utils.py in f()
- 316 if callback_timeout is not None:
- 317 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
- --> 318 result[0] = yield future
- 319 except Exception as exc:
- 320 error[0] = sys.exc_info()
- ~/miniconda/envs/test/lib/python3.7/site-packages/tornado/gen.py in run(self)
- 733
- 734 try:
- --> 735 value = future.result()
- 736 except Exception:
- 737 exc_info = sys.exc_info()
- ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
- 1726 exc = CancelledError(key)
- 1727 else:
- -> 1728 raise exception.with_traceback(traceback)
- 1729 raise exc
- 1730 if errors == "skip":
- <ipython-input-23-7cdd79a5df30> in ratio()
- 1 @delayed
- 2 def ratio(a, b):
- ----> 3 return a // b
- 4
- 5 @delayed
- ZeroDivisionError: integer division or modulo by zero
The display in this case makes the origin of the exception obvious, but this is not always the case. How should this be debugged, how would we go about finding out the exact conditions that caused the exception?
The first step, of course, is to write well-tested code which makes appropriate assertions about its input and clear warnings and error messages when something goes wrong. This applies to all code.
The most typical thing to do is to execute some portion of the computation in the local thread, so that we can run the Python debugger and query the state of things at the time that the exception happened. Obviously, this cannot be performed on the whole data-set when dealing with Big Data on a cluster, but a suitable sample will probably do even then.
[26]:
import dask with dask.set_options(get=dask.local.get_sync): # do NOT use c.compute(out) here - we specifically do not # want the distributed scheduler out.compute()
--------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-26-7d8f9204655e> in <module> 1 import dask ----> 2 with dask.set_options(get=dask.local.get_sync): 3 # do NOT use c.compute(out) here - we specifically do not 4 # want the distributed scheduler 5 out.compute() AttributeError: module 'dask' has no attribute 'set_options'
[27]:
# uncomment to enter post-mortem debugger # debug
The trouble with this approach is that Dask is meant for the execution of large datasets/computations - you probably can’t simply run the whole thing in one local thread, else you wouldn’t have used Dask in the first place. So the code above should only be used on a small part of the data that also exchibits the error. Furthermore, the method will not work when you are dealing with futures (such as f
, above, or after persisting) instead of delayed-based computations.
As alternative, you can ask the scheduler to analyze your calculation and find the specific sub-task responsible for the error, and pull only it and its dependnecies locally for execution.
[28]:
c.recreate_error_locally(f)
--------------------------------------------------------------------------- ZeroDivisionError Traceback (most recent call last) <ipython-input-28-37e6c8a5bc9f> in <module> ----> 1 c.recreate_error_locally(f) ~/miniconda/envs/test/lib/python3.7/site-packages/distributed/recreate_exceptions.py in recreate_error_locally(self, future) 175 self.client.loop, self._recreate_error_locally, future 176 ) --> 177 func(*args, **kwargs) <ipython-input-23-7cdd79a5df30> in ratio(a, b) 1 @delayed 2 def ratio(a, b): ----> 3 return a // b 4 5 @delayed ZeroDivisionError: integer division or modulo by zero
[29]:
# uncomment to enter post-mortem debugger # debug
Finally, there are errors other than exceptions, when we need to look at the state of the scheduler/workers. In the standard “LocalCluster” we started, we have direct access to these.
[30]:
c.cluster.scheduler.nbytes
[30]:
<class 'distributed.scheduler._OptionalStateLegacyMapping'>({'inc-ae0ff1cb6f5c736d2f30b1b1b13931fb': 28, 'inc-22a0514160481fa01cbf9621a2db3fc1': 28, 'rosenbrock-5c55ddbb195a50bd1292bda48434c5fd': 216, 'rosenbrock-de307b3ee69cf36d78d6e501ce9c3eee': 216, 'rosenbrock-448f57e802b23be97acf790a77857947': 216, 'rosenbrock-56c54c8aeda60cc9e1d18beea678f12f': 216, 'rosenbrock-78db0bc79a37d5122d8bab74608fce58': 216, 'rosenbrock-b42909018a5160ddf8a6f2ee2cc4858e': 216, 'rosenbrock-4812919436cc7dd21ecc2cad4c1557ac': 216, 'rosenbrock-508107a74335fae2cca87eaf3758cb9f': 216, 'rosenbrock-2bcad2587b8cc9f0599e213ca9ff2166': 216, 'rosenbrock-e32ecc7437727031b52d0b3c55e1545e': 216, 'rosenbrock-cac0c4775a1e5692eb11a5464f047bc7': 216, 'rosenbrock-37b2c5adcf918c7a14d9827ec492fff7': 216, 'rosenbrock-74f11a2bf77e3a4dfd1b0806105aff18': 216, 'rosenbrock-45806b54a3408a8926d9bf7c791215c5': 216, 'rosenbrock-e49aba90354e092d98ca914b6841a9cd': 216, 'rosenbrock-e6ad863727f6e6ce7db6c19cf48980d6': 216, 'rosenbrock-2db8c53c7bf16bf79f7045a53a2ed077': 216, 'rosenbrock-b396180e84a239b2b9a6a4b0879d329c': 216, 'rosenbrock-010974c4c80ed4c46e191ae5f5647a97': 216, 'rosenbrock-5ec435d3491bdf125fb931e9c5211fd5': 216, 'rosenbrock-b0b947b538ece5892031a4ce58cd0687': 216, 'summation-d5848cc6-8e5c-4a28-86fd-1efbc91b2e82': 28, 'ratio-bceef034-7b5a-45f3-9fcc-7d4b108f329d': 28, 'ratio-383892bb-4a84-4c58-808e-3d3e6b9dfc7b': 28, 'ratio-b96f0817-d07f-4754-82d9-f2e500003438': 28, 'ratio-4acd4bdb-8647-45a3-bf33-258bbcdcd62b': 28})