Client
The Client is the primary entry point for users of dask.distributed
.
After we setup a cluster, we initialize a Client
by pointingit to the address of a Scheduler
:
- >>> from distributed import Client
- >>> client = Client('127.0.0.1:8786')
There are a few different ways to interact with the cluster through the client:
- The Client satisfies most of the standard concurrent.futures - PEP-3148interface with
.submit
,.map
functions andFuture
objects,allowing the immediate and direct submission of tasks. - The Client registers itself as the default Dask scheduler, and so runs alldask collections like dask.array, dask.bag, dask.dataframe and dask.delayed
- The Client has additional methods for manipulating data remotely. See thefull API for a thorough list.
Concurrent.futures
We can submit individual function calls with the client.submit
method ormany function calls with the client.map
method
- >>> def inc(x):
- return x + 1
- >>> x = client.submit(inc, 10)
- >>> x
- <Future - key: inc-e4853cffcc2f51909cdb69d16dacd1a5>
- >>> L = client.map(inc, range(1000))
- >>> L
- [<Future - key: inc-e4853cffcc2f51909cdb69d16dacd1a5>,
- <Future - key: inc-...>,
- <Future - key: inc-...>,
- <Future - key: inc-...>, ...]
These results live on distributed workers.
We can submit tasks on futures. The function will go to the machine where thefutures are stored and run on the result once it has completed.
- >>> y = client.submit(inc, x) # Submit on x, a Future
- >>> total = client.submit(sum, L) # Map on L, a list of Futures
We gather back the results using either the Future.result
method for singlefutures or client.gather
method for many futures at once.
- >>> x.result()
- 11
- >>> client.gather(L)
- [1, 2, 3, 4, 5, ...]
But, as always, we want to minimize communicating results back to the localprocess. It’s often best to leave data on the cluster and operate on itremotely with functions like submit
, map
, get
and compute
.See efficiency for more information on efficient use ofdistributed.
Dask
The parent library Dask contains objects like dask.array, dask.dataframe,dask.bag, and dask.delayed, which automatically produce parallel algorithmson larger datasets. All dask collections work smoothly with the distributedscheduler.
When we create a Client
object it registers itself as the default Daskscheduler. All .compute()
methods will automatically start using thedistributed system.
- client = Client('scheduler:8786')
- my_dataframe.sum().compute() # Now uses the distributed system by default
We can stop this behavior by using the set_as_default=False
keywordargument when starting the Client.
Dask’s normal .compute()
methods are synchronous, meaning that they blockthe interpreter until they complete. Dask.distributed allows the new abilityof asynchronous computing, we can trigger computations to occur in thebackground and persist in memory while we continue doing other work. This istypically handled with the Client.persist
and Client.compute
methodswhich are used for larger and smaller result sets respectively.
- >>> df = client.persist(df) # trigger all computations, keep df in memory
- >>> type(df)
- dask.DataFrame
For more information see the page on Managing Computation.
Pure Functions by Default
By default we assume that all functions are pure. If this is not the case weshould use the pure=False
keyword argument.
The client associates a key to all computations. This key is accessible onthe Future object.
- >>> from operator import add
- >>> x = client.submit(add, 1, 2)
- >>> x.key
- 'add-ebf39f96ad7174656f97097d658f3fa2'
This key should be the same across all computations with the same inputs andacross all machines. If we run the computation above on any computer with thesame environment then we should get the exact same key.
The scheduler avoids redundant computations. If the result is already inmemory from a previous call then that old result will be used rather thanrecomputing it. Calls to submit or map are idempotent in the common case.
While convenient, this feature may be undesired for impure functions, likerandom
. In these cases two calls to the same function with the same inputsshould produce different results. We accomplish this with the pure=False
keyword argument. In this case keys are randomly generated (by uuid4
.)
- >>> import numpy as np
- >>> client.submit(np.random.random, 1000, pure=False).key
- 'random_sample-fc814a39-ee00-42f3-8b6f-cac65bcb5556'
- >>> client.submit(np.random.random, 1000, pure=False).key
- 'random_sample-a24e7220-a113-47f2-a030-72209439f093'
Async/await Operation
If we are operating in an asynchronous environment then the blocking functionslisted above become asynchronous equivalents. You must start your clientwith the asynchronous=True
keyword and yield
or await
blockingfunctions.
- async def f():
- client = await Client(asynchronous=True)
- future = client.submit(func, *args)
- result = await future
- return result
If you want to reuse the same client in asynchronous and synchronousenvironments you can apply the asynchronous=True
keyword at each methodcall.
- client = Client() # normal blocking client
- async def f():
- futures = client.map(func, L)
- results = await client.gather(futures, asynchronous=True)
- return results
See the Asynchronous documentation for more information.
Additional Links
For more information on how to use dask.distributed you may want to look at thefollowing pages: