Scheduling State
Overview
The life of a computation with Dask can be described in the following stages:
- The user authors a graph using some library, perhaps dask.delayed ordask.dataframe or the
submit/map
functions on the client. They submitthese tasks to the scheduler. - The schedulers assimilates these tasks into its graph of all tasks totrack, and as their dependencies become available it asks workers to runeach of these tasks in turn.
- The worker receives information about how to run the task, communicateswith its peer workers to collect data dependencies, and then runs therelevant function on the appropriate data. It reports back to thescheduler that it has finished, keeping the result stored in the workerwhere it was computed.
- The scheduler reports back to the user that the task has completed. If theuser desires, it then fetches the data from the worker through thescheduler.
Most relevant logic is in tracking tasks as they evolve from newly submitted,to waiting for dependencies, to actively running on some worker, to finished inmemory, to garbage collected. Tracking this process, and tracking all effectsthat this task has on other tasks that might depend on it, is the majority ofthe complexity of the dynamic task scheduler. This section describes thesystem used to perform this tracking.
For more abstract information about the policies used by the scheduler, seeScheduling Policies.
The scheduler keeps internal state about several kinds of entities:
- Individual tasks known to the scheduler
- Workers connected to the scheduler
- Clients connected to the scheduler
Note
Everything listed in this page is an internal detail of how Dask operates.It may change between versions and you should probably avoid relying on itin user code (including on any APIs explained here).
Task State
Internally, the scheduler moves tasks between a fixed set of states,notably released
, waiting
, no-worker
, processing
,memory
, error
.
Tasks flow along the following states with the following allowed transitions:
- Released: Known but not actively computing or in memory
- Waiting: On track to be computed, waiting on dependencies to arrive inmemory
- No-worker: Ready to be computed, but no appropriate worker exists(for example because of resource restrictions, or because no worker isconnected at all).
- Processing: Actively being computed by one or more workers
- Memory: In memory on one or more workers
- Erred: Task computation, or one of its dependencies, has encountered an error
- Forgotten (not actually a state): Task is no longer needed by any clientor dependent task
In addition to the literal state, though, other information needs to bekept and updated about each task. Individual task state is stored in anobject named TaskState
and consists of the following information:
- class
distributed.scheduler.
TaskState
(key, run_spec)[source]
A simple object holding information about a task.
key: str
The key is the unique identifier of a task, generally formed from the name of the function, followed by a hash of the function and arguments, like
'inc-ab31c010444977004d656610d2d421ec'
.
prefix: TaskPrefix
The broad class of tasks to which this task belongs like “inc” or “read_csv”
run_spec: object
A specification of how to run the task. The type and meaning of this value is opaque to the scheduler, as it is only interpreted by the worker to which the task is sent for executing.
As a special case, this attribute may also be
None
, in which case the task is “pure data” (such as, for example, a piece of data loaded in the scheduler usingClient.scatter()
). A “pure data” task cannot be computed again if its value is lost.
priority: tuple
The priority provides each task with a relative ranking which is used to break ties when many tasks are being considered for execution.
This ranking is generally a 2-item tuple. The first (and dominant) item corresponds to when it was submitted. Generally, earlier tasks take precedence. The second item is determined by the client, and is a way to prioritize tasks within a large graph that may be important, such as if they are on the critical path, or good to run in order to release many dependencies. This is explained further in Scheduling Policy.
state: str
This task’s current state. Valid states include
released
,waiting
,no-worker
,processing
,memory
,erred
andforgotten
. If it isforgotten
, the task isn’t stored in thetasks
dictionary anymore and will probably disappear soon from memory.
dependencies: {TaskState}
The set of tasks this task depends on for proper execution. Only tasks still alive are listed in this set. If, for whatever reason, this task also depends on a forgotten task, the
has_lost_dependencies
flag is set.A task can only be executed once all its dependencies have already been successfully executed and have their result stored on at least one worker. This is tracked by progressively draining the
waiting_on
set.
dependents: {TaskState}
The set of tasks which depend on this task. Only tasks still alive are listed in this set.
This is the reverse mapping of
dependencies
.
has_lost_dependencies: bool
Whether any of the dependencies of this task has been forgotten. For memory consumption reasons, forgotten tasks are not kept in memory even though they may have dependent tasks. When a task is forgotten, therefore, each of its dependents has their
has_lost_dependencies
attribute set toTrue
.If
has_lost_dependencies
is true, this task cannot go into the “processing” state anymore.
waiting_on: {TaskState}
The set of tasks this task is waiting on before it can be executed. This is always a subset of
dependencies
. Each time one of the dependencies has finished processing, it is removed from thewaiting_on
set.Once
waiting_on
becomes empty, this task can move from the “waiting” state to the “processing” state (unless one of the dependencies errored out, in which case this task is instead marked “erred”).
waiters: {TaskState}
The set of tasks which need this task to remain alive. This is always a subset of
dependents
. Each time one of the dependents has finished processing, it is removed from thewaiters
set.Once both
waiters
andwho_wants
become empty, this task can be released (if it has a non-emptyrun_spec
) or forgotten (otherwise) by the scheduler, and by any workers inwho_has
.Note
Counter-intuitively,
waiting_on
andwaiters
are not reverse mappings of each other.
who_wants: {ClientState}
The set of clients who want this task’s result to remain alive. This is the reverse mapping of
ClientState.wants_what
.When a client submits a graph to the scheduler it also specifies which output tasks it desires, such that their results are not released from memory.
Once a task has finished executing (i.e. moves into the “memory” or “erred” state), the clients in
who_wants
are notified.Once both
waiters
andwho_wants
become empty, this task can be released (if it has a non-emptyrun_spec
) or forgotten (otherwise) by the scheduler, and by any workers inwho_has
.
who_has: {WorkerState}
The set of workers who have this task’s result in memory. It is non-empty iff the task is in the “memory” state. There can be more than one worker in this set if, for example,
Client.scatter()
orClient.replicate()
was used.This is the reverse mapping of
WorkerState.has_what
.
processing_on: WorkerState (or None)
If this task is in the “processing” state, which worker is currently processing it. Otherwise this is
None
.This attribute is kept in sync with
WorkerState.processing
.
retries: int
The number of times this task can automatically be retried in case of failure. If a task fails executing (the worker returns with an error), its
retries
attribute is checked. If it is equal to 0, the task is marked “erred”. If it is greater than 0, theretries
attribute is decremented and execution is attempted again.
nbytes: int (or None)
The number of bytes, as determined by
sizeof
, of the result of a finished task. This number is used for diagnostics and to help prioritize work.
type: str
The type of the object as a string. Only present for tasks that have been computed.
exception: object
If this task failed executing, the exception object is stored here. Otherwise this is
None
.
traceback: object
If this task failed executing, the traceback object is stored here. Otherwise this is
None
.
exception_blame: TaskState (or None)
If this task or one of its dependencies failed executing, the failed task is stored here (possibly itself). Otherwise this is
None
.
suspicious: int
The number of times this task has been involved in a worker death.
Some tasks may cause workers to die (such as calling
os._exit(0)
). When a worker dies, all of the tasks on that worker are reassigned to others. This combination of behaviors can cause a bad task to catastrophically destroy all workers on the cluster, one after another. Whenever a worker dies, we mark each task currently processing on that worker (as recorded byWorkerState.processing
) as suspicious.If a task is involved in three deaths (or some other fixed constant) then we mark the task as
erred
.
host_restrictions: {hostnames}
A set of hostnames where this task can be run (or
None
if empty). Usually this is empty unless the task has been specifically restricted to only run on certain hosts. A hostname may correspond to one or several connected workers.
worker_restrictions: {worker addresses}
A set of complete worker addresses where this can be run (or
None
if empty). Usually this is empty unless the task has been specifically restricted to only run on certain workers.Note this is tracking worker addresses, not worker states, since the specific workers may not be connected at this time.
resource_restrictions: {resource: quantity}
Resources required by this task, such as
{'gpu': 1}
or{'memory': 1e9}
(orNone
if empty). These are user-defined names and are matched against the contents of eachWorkerState.resources
dictionary.
loose_restrictions: bool
If
False
, each ofhost_restrictions
,worker_restrictions
andresource_restrictions
is a hard constraint: if no worker is available satisfying those restrictions, the task cannot go into the “processing” state and will instead go into the “no-worker” state.If
True
, the above restrictions are mere preferences: if no worker is available satisfying those restrictions, the task can still go into the “processing” state and be sent for execution to another connected worker.
: The group of tasks to which this one belongs.
The scheduler keeps track of all the TaskState
objects (thosenot in the “forgotten” state) using several containers:
tasks: {str: TaskState}
- A dictionary mapping task keys (usually strings) to
TaskState
objects. Task keys are how information about tasks is communicatedbetween the scheduler and clients, or the scheduler and workers; thisdictionary is then used to find the correspondingTaskState
object.
unrunnable: {TaskState}
- A set of
TaskState
objects in the “no-worker” state. Thesetasks already have all theirdependencies
satisfied(theirwaiting_on
set is empty), and are waitingfor an appropriate worker to join the network before computing.
Worker State
Each worker’s current state is stored in a WorkerState
object.This information is involved in decidingwhich worker to run a task on.
- class
distributed.scheduler.
WorkerState
(address=None, pid=0, name=None, nthreads=0, memory_limit=0, local_directory=None, services=None, versions=None, nanny=None, extra=None)[source] A simple object holding information about a worker.
address
This worker’s unique key. This can be its connected address(such as
'tcp://127.0.0.1:8891'
) or an alias (such as'alice'
).processing: {TaskState: cost}
- A dictionary of tasks that have been submitted to this worker.Each task state is asssociated with the expected cost in secondsof running that task, summing both the task’s expected computationtime and the expected communication time of its result.
Multiple tasks may be submitted to a worker in advance and the workerwill run them eventually, depending on its execution resources(but see Work Stealing).
All the tasks here are in the “processing” state.
This attribute is kept in sync with TaskState.processing_on
.
has_what: {TaskState}
- The set of tasks which currently reside on this worker.All the tasks here are in the “memory” state.
This is the reverse mapping of TaskState.who_has
.
nbytes: int
The total memory size, in bytes, used by the tasks this workerholds in memory (i.e. the tasks in this worker’s
has_what
).nthreads: int
The number of CPU threads made available on this worker.
resources: {str: Number}
The available resources on this worker like
{'gpu': 2}
.These are abstract quantities that constrain certain tasks fromrunning at the same time on this worker.used_resources: {str: Number}
The sum of each resource used by all tasks allocated to this worker.The numbers in this dictionary can only be less or equal thanthose in this worker’s
resources
.occupancy: Number
The total expected runtime, in seconds, of all tasks currentlyprocessing on this worker. This is the sum of all the costs inthis worker’s
processing
dictionary.status: str
The current status of the worker, either
'running'
or'closed'
nanny: str
Address of the associated Nanny, if present
last_seen: Number
The last time we received a heartbeat from this worker, in localscheduler time.
actors: {TaskState}
- A set of all TaskStates on this worker that are actors. This onlyincludes those actors whose state actually lives on this worker, notactors to which this worker has a reference.
In addition to individual worker state, the scheduler maintains twocontainers to help with scheduling tasks:
Scheduler.saturated: {WorkerState}
- A set of workers whose computing power (asmeasured by
WorkerState.nthreads
) is fully exploited by processingtasks, and whose currentoccupancy
is a lot greaterthan the average.
Scheduler.idle: {WorkerState}
- A set of workers whose computing power is not fully exploited. Theseworkers are assumed to be able to start computing new tasks immediately.
These two sets are disjoint. Also, some workers may be neither “idle”nor “saturated”. “Idle” workers will be preferred whendeciding a suitable worker to run a new task on.Conversely, “saturated” workers may see their workload lightened throughWork Stealing.
Client State
Information about each individual client of the scheduler is keptin a ClientState
object:
- class
distributed.scheduler.
ClientState
(client, versions=None)[source] A simple object holding information about a client.
client_key: str
A unique identifier for this client. This is generally an opaquestring generated by the client itself.
wants_what: {TaskState}
- A set of tasks this client wants kept in memory, so that it candownload its result when desired. This is the reverse mapping of
TaskState.who_wants
.
Tasks are typically removed from this set when the correspondingobject in the client’s space (for example a Future
or a Daskcollection) gets garbage-collected.
Understanding a Task’s Flow
As seen above, there are numerous pieces of information pertaining totask and worker state, and some of them can be computed, updated orremoved during a task’s transitions.
The table below shows which state variable a task is in, depending on thetask’s state. Cells with a check mark (✓) indicate the task key must_be present in the given state variable; cells with an question mark (?)indicate the task key _may be present in the given state variable.
State variable | Released | Waiting | No-worker | Processing | Memory | Erred |
---|---|---|---|---|---|---|
TaskState.dependencies | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
TaskState.dependents | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
TaskState.hostrestrictions | ? | ? | ? | ? | ? | ? |
TaskState.worker_restrictions | ? | ? | ? | ? | ? | ? |
TaskState.resource_restrictions | ? | ? | ? | ? | ? | ? |
TaskState.loose_restrictions | ? | ? | ? | ? | ? | ? |
TaskState.waiting_on | ✓ | ✓ | ||||
TaskState.waiters | ✓ | ✓ | ||||
TaskState.processing_on | ✓ | |||||
WorkerState.processing | ✓ | |||||
TaskState.who_has | ✓ | |||||
WorkerState.has_what | ✓ | |||||
TaskState.nbytes (1) | ? | ? | ? | ? | ✓ | ? |
TaskState.exception (2) | ? | |||||
TaskState.traceback (2)_ | ? | |||||
TaskState.exception_blame | ✓ | |||||
TaskState.retries | ? | ? | ? | ? | ? | ? |
TaskState.suspicious_tasks | ? | ? | ? | ? | ? | ? |
Notes:
TaskState.nbytes
: this attribute can be known as long as atask has already been computed, even if it has been later released.TaskState.exception
andTaskState.traceback
shouldbe looked up on theTaskState.exception_blame
task. The table below shows which worker state variables are updated on eachtask state transition.
Transition | Affected worker state |
---|---|
released → waiting | occupancy, idle, saturated |
waiting → processing | occupancy, idle, saturated, used_resources |
waiting → memory | idle, saturated, nbytes |
processing → memory | occupancy, idle, saturated, used_resources, nbytes |
processing → erred | occupancy, idle, saturated, used_resources |
processing → released | occupancy, idle, saturated, used_resources |
memory → released | nbytes |
memory → forgotten | nbytes |
Note
Another way of understanding this table is to observe that entering orexiting a specific task state updates a well-defined set of worker statevariables. For example, entering and exiting the “memory” state updatesWorkerState.nbytes
.
Implementation
Every transition between states is a separate method in the scheduler. Thesetask transition functions are prefixed with transition
and then have thename of the start and finish task state like the following.
- def transition_released_waiting(self, key):
- def transition_processing_memory(self, key):
- def transition_processing_erred(self, key):
These functions each have three effects.
- They perform the necessary transformations on the scheduler state (the 20dicts/lists/sets) to move one key between states.
- They return a dictionary of recommended
{key: state}
transitions toenact directly afterwards on other keys. For example after we transition akey into memory we may find that many waiting keys are now ready totransition from waiting to a ready state. - Optionally they include a set of validation checks that can be turned onfor testing.
Rather than call these functions directly we call the central function
transition
:
- def transition(self, key, final_state):
- """ Transition key to the suggested state """
This transition function finds the appropriate path from the current to thefinal state. It also serves as a central point for logging and diagnostics.
Often we want to enact several transitions at once or want to continuallyrespond to new transitions recommended by initial transitions until we reach asteady state. For that we use the transitions
function (note the plural s
).
- def transitions(self, recommendations):
- recommendations = recommendations.copy()
- while recommendations:
- key, finish = recommendations.popitem()
- new = self.transition(key, finish)
- recommendations.update(new)
This function runs transition
, takes the recommendations and runs them aswell, repeating until no further task-transitions are recommended.
Stimuli
Transitions occur from stimuli, which are state-changing messages to thescheduler from workers or clients. The scheduler responds to the followingstimuli:
- Workers
- Task finished: A task has completed on a worker and is now in memory
- Task erred: A task ran and erred on a worker
- Task missing data: A task tried to run but was unable to find necessarydata on other workers
- Worker added: A new worker was added to the network
- Worker removed: An existing worker left the network
- Clients
- Update graph: The client sends more tasks to the scheduler
- Release keys: The client no longer desires the result of certain keys
Stimuli functions are prepended with the text stimulus
, and take a varietyof keyword arguments from the message as in the following examples:
- def stimulus_task_finished(self, key=None, worker=None, nbytes=None,
- type=None, compute_start=None, compute_stop=None,
- transfer_start=None, transfer_stop=None):
- def stimulus_task_erred(self, key=None, worker=None,
- exception=None, traceback=None)
These functions change some non-essential administrative state and then calltransition functions.
Note that there are several other non-state-changing messages that we receivefrom the workers and clients, such as messages requesting information about thecurrent state of the scheduler. These are not considered stimuli.
API
- class
distributed.scheduler.
Scheduler
(loop=None, delete_interval='500ms', synchronize_worker_interval='60s', services=None, service_kwargs=None, allowed_failures=None, extensions=None, validate=None, scheduler_file=None, security=None, worker_ttl=None, idle_timeout=None, interface=None, host=None, port=0, protocol=None, dashboard_address=None, preload=None, preload_argv=(), **kwargs)[source] - Dynamic distributed task scheduler
The scheduler tracks the current state of workers, data, and computations.The scheduler listens for events and responds by controlling workersappropriately. It continuously tries to use the workers to execute an evergrowing dask graph.
All events are handled quickly, in linear time with respect to their input(which is often of constant size) and generally within a millisecond. Toaccomplish this the scheduler tracks a lot of state. Every operationmaintains the consistency of this state.
The scheduler communicates with the outside world through Comm objects.It maintains a consistent and valid view of the world even when listeningto several clients at once.
A Scheduler is typically started either with the dask-scheduler
executable:
- $ dask-scheduler
- Scheduler started at 127.0.0.1:8786
Or within a LocalCluster a Client starts up without connectioninformation:
- >>> c = Client()
- >>> c.cluster.scheduler
- Scheduler(...)
Users typically do not interact with the scheduler directly but rather withthe client object Client
.
State
The scheduler contains the following state variables. Each variable islisted along with what it stores and a brief description.
- tasks:
{task key: TaskState}
- Tasks currently known to the scheduler
- tasks:
- unrunnable:
{TaskState}
- Tasks in the “no-worker” state
- unrunnable:
- workers:
{worker key: WorkerState}
- Workers currently connected to the scheduler
- workers:
- idle:
{WorkerState}
: - Set of workers that are not fully utilized
- idle:
- saturated:
{WorkerState}
: - Set of workers that are not over-utilized
- saturated:
- host_info:
{hostname: dict}
: - Information about each worker host
- host_info:
- clients:
{client key: ClientState}
- Clients currently connected to the scheduler
- clients:
- services:
{str: port}
: - Other services running on this scheduler, like Bokeh
- services:
- loop:
IOLoop
: - The running Tornado IOLoop
- loop:
- client_comms:
{client key: Comm}
- For each client, a Comm object used to receive task requests andreport task status updates.
- client_comms:
- stream_comms:
{worker key: Comm}
- For each worker, a Comm object from which we both accept stimuli andreport results
- stream_comms:
- task_duration:
{key-prefix: time}
- Time we expect certain functions to take, e.g.
{'sum': 0.25}
- task_duration:
adaptivetarget
(_self, comm=None, target_duration='5s')[source]- Desired number of workers based on the current workload
This looks at the current running tasks and memory use, and returns anumber of desired workers. This is often used by adaptive scheduling.
Parameters:
- **target_duration: str**
-
A desired duration of time for computations to take. This affectshow rapidly the scheduler will ask to scale.
See also
- [<code>distributed.deploy.Adaptive</code>]($252c3ec3868182cb.md#distributed.deploy.Adaptive)
-
addclient
(_self, comm, client=None, versions=None)[source]- Add client to network
We listen to all future messages from this Comm.
addkeys
(_self, comm=None, worker=None, keys=())[source]- Learn that a worker has certain keys
This should not be used in practice and is mostly here for legacyreasons. However, it is sent by workers from time to time.
addplugin
(_self, plugin=None, idempotent=False, **kwargs)[source]- Add external plugin to scheduler
See https://distributed.readthedocs.io/en/latest/plugins.html
addworker
(_self, comm=None, address=None, keys=(), nthreads=None, name=None, resolve_address=True, nbytes=None, types=None, now=None, resources=None, host_info=None, memory_limit=None, metrics=None, pid=0, services=None, local_directory=None, versions=None, nanny=None, extra=None)[source]Add a new worker to the cluster
broadcast
(self, comm=None, msg=None, workers=None, hosts=None, nanny=False, serializers=None)[source]Broadcast message to workers, return all results
cancelkey
(_self, key, client, retries=5, force=False)[source]Cancel a particular key and all dependents
checkidle_saturated
(_self, ws, occ=None)[source]- Update the status of the idle and saturated state
The scheduler keeps track of workers that are ..
- Saturated: have enough work to stay busy
- Idle: do not have enough work to stay busy
They are considered saturated if they both have enough tasks to occupyall of their threads, and if the expected runtime of those tasks islarge enough.
This is useful for load balancing and adaptivity.
clientheartbeat
(_self, client=None)[source]Handle heartbeats from Client
clientreleases_keys
(_self, keys=None, client=None)[source]Remove keys from client desired list
close
(self, comm=None, fast=False, close_workers=False)[source]- Send cleanup signal to all coroutines then wait until finished
See also
- <code>Scheduler.cleanup</code>
-
closeworker
(_self, stream=None, worker=None, safe=None)[source]- Remove a worker from the cluster
This both removes the worker from our local state and also sends asignal to the worker to shut down. This works regardless of whether ornot the worker has a nanny process restarting it
coerceaddress
(_self, addr, resolve=True)[source]- Coerce possible input addresses to canonical form.resolve can be disabled for testing with fake hostnames.
Handles strings, tuples, or aliases.
coercehostname
(_self, host)[source]Coerce the hostname of a worker.
decideworker
(_self, ts)[source]Decide on a worker for task ts. Return a WorkerState.
feed
(self, comm, function=None, setup=None, teardown=None, interval='1s', **kwargs)[source]- Provides a data Comm to external requester
Caution: this runs arbitrary Python code on the scheduler. This shouldeventually be phased out. It is mostly used by diagnostics.
gather
(self, comm=None, keys=None, serializers=None)[source]Collect data in from workers
getcomm_cost
(_self, ts, ws)[source]Get the estimated communication cost (in s.) to compute the taskon the given worker.
gettask_duration
(_self, ts, default=0.5)[source]Get the estimated computation cost of the given task(not including any communication cost).
getworker_service_addr
(_self, worker, service_name, protocol=False)[source]- Get the (host, port) address of the named service on the worker.Returns None if the service doesn’t exist.
Parameters:
- **worker**:address
-
- **service_name**:str
-
Common services include ‘bokeh’ and ‘nanny’
- **protocol**:boolean
-
Whether or not to include a full address with protocol (True)or just a (host, port) pair
handlelong_running
(_self, key=None, worker=None, compute_duration=None)[source]- A task has seceded from the thread pool
We stop the task from being stolen in the future, and change taskduration accounting as if the task has stopped.
handleworker
(_self, comm=None, worker=None)[source]- Listen to responses from a single worker
This is the main loop for scheduler-worker interaction
See also
- <code>Scheduler.handle_client</code>
- Equivalent coroutine for clients
identity
(self, comm=None)[source]Basic information about ourselves and our cluster
newtask
(_self, key, spec, state)[source]Create a new task, and associated states
proxy
(self, comm=None, msg=None, worker=None, serializers=None)[source]Proxy a communication through the scheduler to some other worker
rebalance
(self, comm=None, keys=None, workers=None)[source]- Rebalance keys so that each worker stores roughly equal bytes
Policy
This orders the workers by what fraction of bytes of the existing keysthey have. It walks down this list from most-to-least. At each workerit sends the largest results it can find and sends them to the leastoccupied worker until either the sender or the recipient are at theaverage expected load.
reevaluateoccupancy
(_self, worker_index=0)[source]- Periodically reassess task duration time
The expected duration of a task can change over time. Unfortunately wedon’t have a good constant-time way to propagate the effects of thesechanges out to the summaries that they affect, like the total expectedruntime of each of the workers, or what tasks are stealable.
In this coroutine we walk through all of the workers and re-align theirestimates with the current state of tasks. We do this periodicallyrather than at every transition, and we only do it if the schedulerprocess isn’t under load (using psutil.Process.cpu_percent()). Thislets us avoid this fringe optimization when we have better things tothink about.
registerworker_plugin
(_self, comm, plugin, name=None)[source]Registers a setup function, and call it on every worker
removeclient
(_self, client=None)[source]Remove client from network
removeplugin
(_self, plugin)[source]Remove external plugin from scheduler
removeworker
(_self, comm=None, address=None, safe=False, close=True)[source]- Remove worker from cluster
We do this when a worker reports that it plans to leave or when itappears to be unresponsive. This may send its tasks back to a releasedstate.
replicate
(self, comm=None, keys=None, n=None, workers=None, branching_factor=2, delete=True, lock=True)[source]- Replicate data throughout cluster
This performs a tree copy of the data throughout the networkindividually on each piece of data.
Parameters:
- **keys: Iterable**
-
list of keys to replicate
- **n: int**
-
Number of replications we expect to see within the cluster
- **branching_factor: int, optional**
-
The number of workers that can copy data in each generation.The larger the branching factor, the more data we copy ina single step, but the more a given worker risks beingswamped by data requests.
See also
- [<code>Scheduler.rebalance</code>](#distributed.scheduler.Scheduler.rebalance)
-
report
(self, msg, ts=None, client=None)[source]- Publish updates to all listening Queues and Comms
If the message contains a key then we only send the message to thosecomms that care about the key.
reschedule
(self, key=None, worker=None)[source]- Reschedule a task
Things may have shifted and this task may now be better suited to runelsewhere
restart
(self, client=None, timeout=3)[source]Restart all workers. Reset local state.
retireworkers
(_self, comm=None, workers=None, remove=True, close_workers=False, names=None, lock=True, **kwargs)[source]- Gracefully retire workers from cluster
Parameters:
- **workers: list (optional)**
-
List of worker addresses to retire.If not provided we call workers_to_close
which finds a good set
- **workers_names: list (optional)**
-
List of worker names to retire.
- **remove: bool (defaults to True)**
-
Whether or not to remove the worker metadata immediately or elsewait for the worker to contact us
- **close_workers: bool (defaults to False)**
-
Whether or not to actually close the worker explicitly from here.Otherwise we expect some external job scheduler to finish off theworker.
- ****kwargs: dict**
-
Extra options to pass to workers_to_close to determine whichworkers we should drop Returns:
- Dictionary mapping worker ID/address to dictionary of information about
-
- that worker for each retired worker.
-
See also
- [<code>Scheduler.workers_to_close</code>](#distributed.scheduler.Scheduler.workers_to_close)
-
runfunction
(_self, stream, function, args=(), kwargs={}, wait=True)[source]- Run a function within this process
See also
- <code>Client.run_on_scheduler</code>
-
scatter
(self, comm=None, data=None, workers=None, client=None, broadcast=False, timeout=2)[source]- Send data out to workers
See also
- [<code>Scheduler.broadcast</code>](#distributed.scheduler.Scheduler.broadcast)
-
sendtask_to_worker
(_self, worker, key)[source]Send a single computational task to a worker
start
(self)[source]Clear out old state and restart all running coroutines
startipython
(_self, comm=None)[source]- Start an IPython kernel
Returns Jupyter connection info dictionary.
stimuluscancel
(_self, comm, keys=None, client=None, force=False)[source]Stop execution on a list of keys
stimulusmissing_data
(_self, cause=None, key=None, worker=None, ensure=True, **kwargs)[source]Mark that certain keys have gone missing. Recover.
stimulustask_erred
(_self, key=None, worker=None, exception=None, traceback=None, **kwargs)[source]Mark that a task has erred on a particular worker
stimulustask_finished
(_self, key=None, worker=None, **kwargs)[source]Mark that a task has finished execution on a particular worker
story
(self, *keys)[source]Get all transitions that touch one of the input keys
transition
(self, key, finish, *args, **kwargs)[source]- Transition a key from its current state to the finish state
Returns:
- Dictionary of recommendations for future transitions
-
See also
- [<code>Scheduler.transitions</code>](#distributed.scheduler.Scheduler.transitions)
- transitive version of this function
Examples
- >>> self.transition('x', 'waiting')
- {'x': 'processing'}
transitionstory
(_self, *keys)Get all transitions that touch one of the input keys
transitions
(self, recommendations)[source]- Process transitions until none are left
This includes feedback from previous transitions and continues until wereach a steady state
updatedata
(_self, comm=None, who_has=None, nbytes=None, client=None, serializers=None)[source]- Learn that new data has entered the network from an external source
See also
- <code>Scheduler.mark_key_in_memory</code>
-
updategraph
(_self, client=None, tasks=None, keys=None, dependencies=None, restrictions=None, priority=None, loose_restrictions=None, resources=None, submitting_task=None, retries=None, user_priority=0, actors=None, fifo_timeout=0)[source]- Add new computations to the internal dask graph
This happens whenever the Client calls submit, map, get, or compute.
validworkers
(_self, ts)[source]- Return set of currently valid workers for key
If all workers are valid then this returns True
.This checks tracks the following state:
- worker_restrictions
- host_restrictions
- resource_restrictions
workerobjective
(_self, ts, ws)[source]- Objective function to determine which worker should get the task
Minimize expected start time. If a tie then break with data storage.
workersend
(_self, worker, msg)[source]- Send message to worker
This also handles connection failures by adding a callback to removethe worker on the next cycle.
workerslist
(_self, workers)[source]- List of qualifying workers
Takes a list of worker addresses or hostnames.Returns a list of all worker addresses that match
workersto_close
(_self, comm=None, memory_ratio=None, n=None, key=None, minimum=None, target=None, attribute='address')[source]- Find workers that we can close with low cost
This returns a list of workers that are good candidates to retire.These workers are not running anything and are storingrelatively little data relative to their peers. If all workers areidle then we still maintain enough workers to have enough RAM to storeour data, with a comfortable buffer.
This is for use with systems like distributed.deploy.adaptive
.
Parameters:
- **memory_factor: Number**
-
Amount of extra space we want to have for our stored data.Defaults two 2, or that we want to have twice as much memory as wecurrently have data.
- **n: int**
-
Number of workers to close
- **minimum: int**
-
Minimum number of workers to keep around
- **key: Callable(WorkerState)**
-
An optional callable mapping a WorkerState object to a groupaffiliation. Groups will be closed together. This is useful whenclosing workers must be done collectively, such as by hostname.
- **target: int**
-
Target number of workers to have after we close
- **attribute**:str
-
The attribute of the WorkerState object to return, like “address”or “name”. Defaults to “address”. Returns:
- to_close: list of worker addresses that are OK to close
-
See also
- [<code>Scheduler.retire_workers</code>](#distributed.scheduler.Scheduler.retire_workers)
-
Examples
- >>> scheduler.workers_to_close()
- ['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']
Group workers by hostname prior to closing
- >>> scheduler.workers_to_close(key=lambda ws: ws.host)
- ['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']
Remove two workers
- >>> scheduler.workers_to_close(n=2)
Keep enough workers to have twice as much memory as we we need.
- >>> scheduler.workers_to_close(memory_ratio=2)
distributed.scheduler.
decideworker
(_ts, all_workers, valid_workers, objective)[source]- Decide which worker should take task ts.
We choose the worker that has the data on which ts depends.
If several workers have dependencies then we choose the less-busy worker.
Optionally provide valid_workers of where jobs are allowed to occur(if all workers are allowed to take the task, pass True instead).
If the task requires data communication because no eligible worker hasall the dependencies already, then we choose to minimize the numberof bytes sent between workers. This is determined by calling theobjective function.