API
Dask APIs generally follow from upstream APIs:
- Arrays follows NumPy
- DataFrames follows Pandas
- Bag follows map/filter/groupby/reduce common in Spark and Python iterators
- Dask-ML follows the Scikit-Learn and others
- Delayed wraps general Python code
- Futures follows concurrent.futures from the standard library for real-time computation.
Additionally, Dask has its own functions to start computations, persist data inmemory, check progress, and so forth that complement the APIs above.These more general Dask functions are described below:
compute (*args, **kwargs) | Compute several dask collections at once. |
is_dask_collection (x) | Returns True if x is a dask collection |
optimize (*args, **kwargs) | Optimize several dask collections at once. |
persist (*args, **kwargs) | Persist multiple Dask collections into memory |
visualize (*args, **kwargs) | Visualize several dask graphs at once. |
These functions work with any scheduler. More advanced operations areavailable when using the newer scheduler and starting adask.distributed.Client
(which, despite its name, runs nicely on asingle machine). This API provides the ability to submit, cancel, and trackwork asynchronously, and includes many functions for complex inter-taskworkflows. These are not necessary for normal operation, but can be useful forreal-time or advanced operation.
This more advanced API is available in the Dask distributed documentation
Parameters:
- args:object
Any number of objects. If it is a dask object, it’s computed and theresult is returned. By default, python builtin collections are alsotraversed to look for dask objects (for more information see the
traverse
keyword). Non-dask arguments are passed through unchanged.traverse:bool, optional
By default dask traverses builtin python collections looking for daskobjects passed to
compute
. For large collections this can beexpensive. If none of the arguments contain any dask objects, settraverse=False
to avoid doing this traversal.scheduler:string, optional
Which scheduler to use like “threads”, “synchronous” or “processes”.If not provided, the default is to check the global settings first,and then fall back to the collection defaults.
optimize_graph:bool, optional
If True [default], the optimizations for each collection are appliedbefore computation. Otherwise the graph is run as is. This can beuseful for debugging.
kwargs
- Extra keywords to forward to the scheduler function.
Examples
- >>> import dask.array as da
- >>> a = da.arange(10, chunks=2).sum()
- >>> b = da.arange(10, chunks=2).mean()
- >>> compute(a, b)
- (45, 4.5)
By default, dask objects inside python collections will also be computed:
- >>> compute({'a': a, 'b': b, 'c': 1}) # doctest: +SKIP
- ({'a': 45, 'b': 4.5, 'c': 1},)
Returns equivalent dask collections that all share the same merged andoptimized underlying graph. This can be useful if converting multiplecollections to delayed objects, or to manually apply the optimizations atstrategic points.
Note that in most cases you shouldn’t need to call this method directly.
Parameters:
- *args:objects
Any number of objects. If a dask object, its graph is optimized andmerged with all those of all other dask objects before returning anequivalent dask collection. Non-dask arguments are passed throughunchanged.
traverse:bool, optional
By default dask traverses builtin python collections looking for daskobjects passed to
optimize
. For large collections this can beexpensive. If none of the arguments contain any dask objects, settraverse=False
to avoid doing this traversal.optimizations:list of callables, optional
Additional optimization passes to perform.
**kwargs
- Extra keyword arguments to forward to the optimization passes.
Examples
- >>> import dask.array as da
- >>> a = da.arange(10, chunks=2).sum()
- >>> b = da.arange(10, chunks=2).mean()
- >>> a2, b2 = optimize(a, b)
- >>> a2.compute() == a.compute()
- True
- >>> b2.compute() == b.compute()
- True
This turns lazy Dask collections into Dask collections with the samemetadata, but now with their results fully computed or actively computingin the background.
For example a lazy dask.array built up from many lazy calls will now be adask.array of the same shape, dtype, chunks, etc., but now with all ofthose previously lazy tasks either computed in memory as many small numpy.array
(in the single-machine case) or asynchronously running in thebackground on a cluster (in the distributed case).
This function operates differently if a dask.distributed.Client
existsand is connected to a distributed scheduler. In this case this functionwill return as soon as the task graph has been submitted to the cluster,but before the computations have completed. Computations will continueasynchronously in the background. When using this function with the singlemachine scheduler it blocks until the computations have finished.
When using Dask on a single machine you should ensure that the dataset fitsentirely within memory.
Parameters:
- *args: Dask collections
- scheduler:string, optional
Which scheduler to use like “threads”, “synchronous” or “processes”.If not provided, the default is to check the global settings first,and then fall back to the collection defaults.
traverse:bool, optional
By default dask traverses builtin python collections looking for daskobjects passed to
persist
. For large collections this can beexpensive. If none of the arguments contain any dask objects, settraverse=False
to avoid doing this traversal.optimize_graph:bool, optional
If True [default], the graph is optimized before computation.Otherwise the graph is run as is. This can be useful for debugging.
**kwargs
- Extra keywords to forward to the scheduler function.Returns:
- New dask collections backed by in-memory data
Examples
- >>> df = dd.read_csv('/path/to/*.csv') # doctest: +SKIP
- >>> df = df[df.name == 'Alice'] # doctest: +SKIP
- >>> df['in-debt'] = df.balance < 0 # doctest: +SKIP
- >>> df = df.persist() # triggers computation # doctest: +SKIP
- >>> df.value().min() # future computations are now fast # doctest: +SKIP
- -10
- >>> df.value().max() # doctest: +SKIP
- 100
- >>> from dask import persist # use persist function on multiple collections
- >>> a, b = persist(a, b) # doctest: +SKIP
Requires graphviz
to be installed. All options that are not the daskgraph(s) should be passed as keyword arguments.
Parameters:
- dsk:dict(s) or collection(s)
The dask graph(s) to visualize.
filename:str or None, optional
The name (without an extension) of the file to write to disk. Iffilename is None, no file will be written, and we communicatewith dot using only pipes.
format:{‘png’, ‘pdf’, ‘dot’, ‘svg’, ‘jpeg’, ‘jpg’}, optional
Format in which to write output file. Default is ‘png’.
optimize_graph:bool, optional
If True, the graph is optimized before rendering. Otherwise,the graph is displayed as is. Default is False.
color: {None, ‘order’}, optional
Options to color nodes. Provide
cmap=
keyword for additionalcolormap**kwargs
- Additional keyword arguments to forward to
to_graphviz
.Returns: - result:IPython.diplay.Image, IPython.display.SVG, or None
- See dask.dot.dot_graph for more information.
See also
dask.dot.dot_graph
Notes
For more information on optimization see here:
https://docs.dask.org/en/latest/optimize.html
Examples
- >>> x.visualize(filename='dask.pdf') # doctest: +SKIP
- >>> x.visualize(filename='dask.pdf', color='order') # doctest: +SKIP
Datasets
Dask has a few helpers for generating demo datasets
dask.datasets.
makepeople
(_npartitions=10, records_per_partition=1000, seed=None, locale='en')- Make a dataset of random people
This makes a Dask Bag with dictionary records of randomly generated people.This requires the optional library mimesis
to generate records.
Parameters:
- npartitions:int
Number of partitions
records_per_partition:int
Number of records in each partition
seed:int, (optional)
Random seed
locale:str
- Language locale, like ‘en’, ‘fr’, ‘zh’, or ‘ru’Returns:
- b: Dask Bag
dask.datasets.
timeseries
(start='2000-01-01', end='2000-01-31', freq='1s', partition_freq='1d', dtypes={'name':, 'id': , 'x': , 'y': } , seed=None, **kwargs)- Create timeseries dataframe with random data
Parameters:
- start:datetime (or datetime-like string)
Start of time series
end:datetime (or datetime-like string)
End of time series
dtypes:dict
Mapping of column names to types.Valid types include {float, int, str, ‘category’}
freq:string
String like ‘2s’ or ‘1H’ or ‘12W’ for the time series frequency
partition_freq:string
String like ‘1M’ or ‘2Y’ to divide the dataframe into partitions
seed:int (optional)
Randomstate seed
kwargs:
- Keywords to pass down to individual column creation functions.Keywords should be prefixed by the column name and then an underscore.
Examples
- >>> import dask
- >>> df = dask.datasets.timeseries()
- >>> df.head() # doctest: +SKIP
- timestamp id name x y
- 2000-01-01 00:00:00 967 Jerry -0.031348 -0.040633
- 2000-01-01 00:00:01 1066 Michael -0.262136 0.307107
- 2000-01-01 00:00:02 988 Wendy -0.526331 0.128641
- 2000-01-01 00:00:03 1016 Yvonne 0.620456 0.767270
- 2000-01-01 00:00:04 998 Ursula 0.684902 -0.463278
- >>> df = dask.datasets.timeseries(
- ... '2000', '2010',
- ... freq='2H', partition_freq='1D', seed=1, # data frequency
- ... dtypes={'value': float, 'name': str, 'id': int}, # data types
- ... id_lam=1000 # control number of items in id column
- ... )
Utilities
Dask has some public utility methods. These are primarily used for parsingconfiguration values.
- >>> format_bytes(1)
- '1 B'
- >>> format_bytes(1234)
- '1.23 kB'
- >>> format_bytes(12345678)
- '12.35 MB'
- >>> format_bytes(1234567890)
- '1.23 GB'
- >>> format_bytes(1234567890000)
- '1.23 TB'
- >>> format_bytes(1234567890000000)
- '1.23 PB'
- >>> format_time(1)
- '1.00 s'
- >>> format_time(0.001234)
- '1.23 ms'
- >>> format_time(0.00012345)
- '123.45 us'
- >>> format_time(123.456)
- '123.46 s'
- >>> parse_bytes('100')
- 100
- >>> parse_bytes('100 MB')
- 100000000
- >>> parse_bytes('100M')
- 100000000
- >>> parse_bytes('5kB')
- 5000
- >>> parse_bytes('5.4 kB')
- 5400
- >>> parse_bytes('1kiB')
- 1024
- >>> parse_bytes('1e6')
- 1000000
- >>> parse_bytes('1e6 kB')
- 1000000000
- >>> parse_bytes('MB')
- 1000000
- >>> parse_bytes(123)
- 123
- >>> parse_bytes('5 foos') # doctest: +SKIP
- ValueError: Could not interpret 'foos' as a byte unit
Examples
- >>> parse_timedelta('3s')
- 3
- >>> parse_timedelta('3.5 seconds')
- 3.5
- >>> parse_timedelta('300ms')
- 0.3
- >>> parse_timedelta(timedelta(seconds=3)) # also supports timedeltas
- 3