Asynchronous Operation
Dask can run fully asynchronously and so interoperate with other highlyconcurrent applications. Internally Dask is built on top of Tornado coroutinesbut also has a compatibility layer for asyncio (see below).
Basic Operation
When starting a client provide the asynchronous=True
keyword to tell Daskthat you intend to use this client within an asynchronous context, such as afunction defined with async/await
syntax.
- async def f():
- client = await Client(asynchronous=True)
Operations that used to block now provide Tornado coroutines on which you canawait
.
Fast functions that only submit work remain fast and don’t need to be awaited.This includes all functions that submit work to the cluster, like submit
,map
, compute
, and persist
.
- future = client.submit(lambda x: x + 1, 10)
You can await futures directly
- result = await future
- >>> print(result)
- 11
Or you can use the normal client methods. Any operation that waited until itreceived information from the scheduler should now be await
’ed.
- result = await client.gather(future)
If you want to use an asynchronous function with a synchronous Client
(one made without the asynchronous=True
keyword) then you can apply theasynchronous=True
keyword at each method call and use the Client.sync
function to run the asynchronous function:
- from dask.distributed import Client
- client = Client() # normal blocking client
- async def f():
- future = client.submit(lambda x: x + 1, 10)
- result = await client.gather(future, asynchronous=True)
- return result
- client.sync(f)
Python 2 Compatibility
Everything here works with Python 2 if you replace await
with yield
.See more extensive comparison in the example below.
Example
This self-contained example starts an asynchronous client, submits a trivialjob, waits on the result, and then shuts down the client. You can seeimplementations for Python 2 and 3 and for Asyncio and Tornado.
Python 3 with Tornado or Asyncio
- from dask.distributed import Client
- async def f():
- client = await Client(asynchronous=True)
- future = client.submit(lambda x: x + 1, 10)
- result = await future
- await client.close()
- return result
- # Either use Tornado
- from tornado.ioloop import IOLoop
- IOLoop().run_sync(f)
- # Or use asyncio
- import asyncio
- asyncio.get_event_loop().run_until_complete(f())
Python 2/3 with Tornado
- from dask.distributed import Client
- from tornado import gen
- @gen.coroutine
- def f():
- client = yield Client(asynchronous=True)
- future = client.submit(lambda x: x + 1, 10)
- result = yield future
- yield client.close()
- raise gen.Return(result)
- from tornado.ioloop import IOLoop
- IOLoop().run_sync(f)
Use Cases
Historically this has been used in a few kinds of applications:
- To integrate Dask into other asynchronous services (such as web backends),supplying a computational engine similar to Celery, but while stillmaintaining a high degree of concurrency and not blocking needlessly.
- For computations that change or update state very rapidly, such as iscommon in some advanced machine learning workloads.
- To develop the internals of Dask’s distributed infrastucture, which iswritten entirely in this style.
- For complex control and data structures in advanced applications.