Managing Memory
Dask.distributed stores the results of tasks in the distributed memory of theworker nodes. The central scheduler tracks all data on the cluster anddetermines when data should be freed. Completed results are usually clearedfrom memory as quickly as possible in order to make room for more computation.The result of a task is kept in memory if either of the following conditionshold:
- A client holds a future pointing to this task. The data should stay in RAMso that the client can gather the data on demand.
- The task is necessary for ongoing computations that are working to producethe final results pointed to by futures. These tasks will be removed onceno ongoing tasks require them.
When users hold Future objects or persisted collections (which contain manysuch Futures inside their .dask
attribute) they pin those results to activememory. When the user deletes futures or collections from their local Pythonprocess the scheduler removes the associated data from distributed RAM.Because of this relationship, distributed memory reflects the state of localmemory. A user may free distributed memory on the cluster by deletingpersisted collections in the local session.
Creating Futures
The following functions produce Futures
Client.submit (self, func, *args[, key, …]) | Submit a function application to the scheduler |
Client.map (self, func, *iterables[, key, …]) | Map a function on a sequence of arguments |
Client.compute (self, collections[, sync, …]) | Compute dask collections on cluster |
Client.persist (self, collections[, …]) | Persist dask collections on cluster |
Client.scatter (self, data[, workers, …]) | Scatter data into distributed memory |
The submit
and map
methods handle raw Python functions. Thecompute
and persist
methods handle Dask collections like arrays, bags,delayed values, and dataframes. The scatter
method sends data directlyfrom the local process.
Persisting Collections
Calls to Client.compute
or Client.persist
submit task graphs to thecluster and return Future
objects that point to particular output tasks.
Compute returns a single future per input, persist returns a copy of thecollection with each block or partition replaced by a single future. In short,use persist
to keep full collection on the cluster and use compute
whenyou want a small result as a single future.
Persist is more common and is often used as follows with collections:
- >>> # Construct dataframe, no work happens
- >>> df = dd.read_csv(...)
- >>> df = df[df.x > 0]
- >>> df = df.assign(z = df.x + df.y)
- >>> # Pin data in distributed ram, this triggers computation
- >>> df = client.persist(df)
- >>> # continue operating on df
Note for Spark users: this differs from what you’re accustomed to. Persist isan immediate action. However, you’ll get control back immediately ascomputation occurs in the background.
In this example we build a computation by parsing CSV data, filtering rows, andthen adding a new column. Up until this point all work is lazy; we’ve justbuilt up a recipe to perform the work as a graph in the df
object.
When we call df = client.persist(df)
we cut this graph off of the df
object,send it up to the scheduler, receive Future
objects in return and create anew dataframe with a very shallow graph that points directly to these futures.This happens more or less immediately (as long as it takes to serialize andsend the graph) and we can continue working on our new df
object while thecluster works to evaluate the graph in the background.
Difference with dask.compute
The operations client.persist(df)
and client.compute(df)
are asynchronous and so differfrom the traditional df.compute()
method or dask.compute
function, whichblocks until a result is available. The .compute()
method does not persistany data on the cluster. The .compute()
method also brings the entireresult back to the local machine, so it is unwise to use it on large datasets.However, .compute()
is very convenient for smaller results particularlybecause it does return concrete results in a way that most other tools expect.
Typically we use asynchronous methods like client.persist
to set up largecollections and then use df.compute()
for fast analyses.
- >>> # df.compute() # This is bad and would likely flood local memory
- >>> df = client.persist(df) # This is good and asynchronously pins df
- >>> df.x.sum().compute() # This is good because the result is small
- >>> future = client.compute(df.x.sum()) # This is also good but less intuitive
Clearing data
We remove data from distributed ram by removing the collection from our localprocess. Remote data is removed once all Futures pointing to that data areremoved from all client machines.
- >>> del df # Deleting local data often deletes remote data
If this is the only copy then this will likely trigger the cluster to deletethe data as well.
However if we have multiple copies or other collections based on this one thenwe’ll have to delete them all.
- >>> df2 = df[df.x < 10]
- >>> del df # would not delete data, because df2 still tracks the futures
Aggressively Clearing Data
To definitely remove a computation and all computations that depend on it youcan always cancel
the futures/collection.
- >>> client.cancel(df) # kills df, df2, and every other dependent computation
Alternatively, if you want a clean slate, you can restart the cluster. Thisclears all state and does a hard restart of all worker processes. It generallycompletes in around a second.
- >>> client.restart()
Resilience
Results are not intentionally copied unless necessary for computations on otherworker nodes. Resilience is achieved through recomputation by maintaining theprovenance of any result. If a worker node goes down the scheduler is able torecompute all of its results. The complete graph for any desired Future ismaintained until no references to that future exist.
For more information see Resilience.
Advanced techniques
At first the result of a task is not intentionally copied, but only persists onthe node where it was originally computed or scattered. However result may becopied to another worker node in the course of normal computation if thatresult is required by another task that is intended to by run by a differentworker. This occurs if a task requires two pieces of data on differentmachines (at least one must move) or through work stealing. In these cases itis the policy for the second machine to maintain its redundant copy of the data. This helps to organically spread around data that is in high demand.
However, advanced users may want to control the location, replication, andbalancing of data more directly throughout the cluster. They may know ahead oftime that certain data should be broadcast throughout the network or that theirdata has become particularly imbalanced, or that they want certain pieces ofdata to live on certain parts of their network. These considerations are notusually necessary.
Client.rebalance (self[, futures, workers]) | Rebalance data within network |
Client.replicate (self, futures[, n, …]) | Set replication of futures within network |
Client.scatter (self, data[, workers, …]) | Scatter data into distributed memory |