Foundations
You should read through the quickstart before reading this document.
Distributed computing is hard for two reasons:
- Consistent coordination of distributed systems requires sophistication
- Concurrent network programming is tricky and error prone
The foundations of dask.distributed
provide abstractions to hide somecomplexity of concurrent network programming (#2). These abstractions ease theconstruction of sophisticated parallel systems (#1) in a safer environment.However, as with all layered abstractions, ours has flaws. Critical feedbackis welcome.
Concurrency with Tornado Coroutines
Worker and Scheduler nodes operate concurrently. They serve several overlappingrequests and perform several overlapping computations at the same time withoutblocking. There are several approaches for concurrent programming, we’vechosen to use Tornado for the following reasons:
- Developing and debugging is more comfortable without threads
- Tornado’s documentation is excellent
- Stackoverflow coverage is excellent
- Performance is satisfactory
Endpoint-to-endpoint Communication
The various distributed endpoints (Client, Scheduler, Worker) communicateby sending each other arbitrary Python objects. Encoding, sending and thendecoding those objects is the job of the communication layer.
Ancillary services such as a Bokeh-based Web interface, however, have theirown implementation and semantics.
Protocol Handling
While the abstract communication layer can transfer arbitrary Pythonobjects (as long as they are serializable), participants in a distributed
cluster concretely obey the distributed Protocol, which specifiesrequest-response semantics using a well-defined message format.
Dedicated infrastructure in distributed
handles the various aspectsof the protocol, such as dispatching the various operations supported byan endpoint.
Servers
Worker, Scheduler, and Nanny objects all inherit from a Server
class.
- class
distributed.core.
Server
(handlers, blocked_handlers=None, stream_handlers=None, connection_limit=512, deserialize=True, io_loop=None)[source] - Dask Distributed Server
Superclass for endpoints in a distributed cluster, such as Workerand Scheduler objects.
Handlers
Servers define operations with a handlers
dict mapping operation namesto functions. The first argument of a handler function will be a Comm
for the communication established with the client. Other argumentswill receive inputs from the keys of the incoming message which willalways be a dictionary.
- >>> def pingpong(comm):
- ... return b'pong'
- >>> def add(comm, x, y):
- ... return x + y
- >>> handlers = {'ping': pingpong, 'add': add}
- >>> server = Server(handlers) # doctest: +SKIP
- >>> server.listen('tcp://0.0.0.0:8000') # doctest: +SKIP
Message Format
The server expects messages to be dictionaries with a special key, _‘op’_that corresponds to the name of the operation, and other key-value pairs asrequired by the function.
So in the example above the following would be good messages.
{'op': 'ping'}
{'op': 'add', 'x': 10, 'y': 20}
RPC
To interact with remote servers we typically use rpc
objects whichexpose a familiar method call interface to invoke remote operations.
- class
distributed.core.
rpc
(arg=None, comm=None, deserialize=True, timeout=None, connection_args=None, serializers=None, deserializers=None)[source] - Conveniently interact with a remote server
- >>> remote = rpc(address) # doctest: +SKIP
- >>> response = yield remote.add(x=10, y=20) # doctest: +SKIP
One rpc object can be reused for several interactions.Additionally, this object creates and destroys many comms as necessaryand so is safe to use in multiple overlapping communications.
When done, close comms explicitly.
- >>> remote.close_comms() # doctest: +SKIP
Examples
Here is a small example using distributed.core to create and interact with acustom server.
Server Side
- import asyncio
- from distributed.core import Server
- def add(comm, x=None, y=None): # simple handler, just a function
- return x + y
- async def stream_data(comm, interval=1): # complex handler, multiple responses
- data = 0
- while True:
- await asyncio.sleep(interval)
- data += 1
- await comm.write(data)
- s = Server({'add': add, 'stream_data': stream_data})
- s.listen('tcp://:8888') # listen on TCP port 8888
- asyncio.get_event_loop().run_forever()
Client Side
- import asyncio
- from distributed.core import connect
- async def f():
- comm = await connect('tcp://127.0.0.1:8888')
- await comm.write({'op': 'add', 'x': 1, 'y': 2})
- result = await comm.read()
- await comm.close()
- print(result)
- >>> asyncio.get_event_loop().run_until_complete(g())
- 3
- async def g():
- comm = await connect('tcp://127.0.0.1:8888')
- await comm.write({'op': 'stream_data', 'interval': 1})
- while True:
- result = await comm.read()
- print(result)
- >>> asyncio.get_event_loop().run_until_complete(g())
- 1
- 2
- 3
- ...
Client Side with rpc
RPC provides a more pythonic interface. It also provides other benefits, suchas using multiple streams in concurrent cases. Most distributed code usesrpc
. The exception is when we need to perform multiple reads or writes, aswith the stream data case above.
- import asyncio
- from distributed.core import rpc
- async def f():
- # comm = await connect('tcp://127.0.0.1', 8888)
- # await comm.write({'op': 'add', 'x': 1, 'y': 2})
- # result = await comm.read()
- with rpc('tcp://127.0.0.1:8888') as r:
- result = await r.add(x=1, y=2)
- print(result)
- >>> asyncio.get_event_loop().run_until_complete(f())
- 3