To make setting up adaptive deployments easy, some Dask deployment solutions
offer an .adapt() method. Here is an example with
dask_kubernetes.KubeCluster.
fromdask_kubernetesimport KubeCluster cluster = KubeCluster() cluster.adapt(minimum=0, maximum=100) # scale between 0 and 100 workers
For more keyword options, see the Adaptive class below:
Adaptive(cluster[, interval, minimum, ...])
Adaptively allocate workers based on scheduler load.
The Dask scheduler does not know how to launch workers on its own. Instead, it relies on an external resource scheduler like Kubernetes above, or Yarn, SGE, SLURM, Mesos, or some other in-house system (see how to deploy Dask clusters for options). In order to use adaptive deployments, you must provide some mechanism for the scheduler to launch new workers. Typically, this is done by using one of the solutions listed in the how to deploy Dask clusters, or by subclassing from the Cluster superclass and implementing that API.
Cluster([asynchronous, loop, quiet, name, ...])
Superclass for cluster objects
The Dask scheduler tracks a variety of information that is useful to correctly allocate the number of workers:
The historical runtime of every function and task that it has seen, and all of the functions that it is currently able to run for users
The amount of memory used and available on each worker
Which workers are idle or saturated for various reasons, like the presence of specialized hardware
From these, it is able to determine a target number of workers by dividing the
cumulative expected runtime of all pending tasks by the target_duration
parameter (defaults to five seconds). This number of workers serves as a
baseline request for the resource manager. This number can be altered for a
variety of reasons:
If the cluster needs more memory, then it will choose either the target number of workers or twice the current number of workers (whichever is larger)
If the target is outside of the range of the minimum and maximum values, then it is clipped to fit within that range
Additionally, when scaling down, Dask preferentially chooses those workers that
are idle and have the least data in memory. It moves that data to other
machines before retiring the worker. To avoid rapid cycling of the cluster up
and down in size, we only retire a worker after a few cycles have gone by where
it has consistently been a good idea to retire it (controlled by the
wait_count and interval parameters).
Adaptively allocate workers based on scheduler load. A superclass.
Contains logic to dynamically resize a Dask cluster based on current use.
This class needs to be paired with a system that can create and destroy
Dask workers using a cluster resource manager. Typically it is built into
already existing solutions, rather than used directly by users.
It is most commonly used from the .adapt(...) method of various Dask
cluster classes.
Must have scale and scale_down methods/coroutines
Milliseconds between checks
Number of consecutive times that a worker should be suggested for removal before we remove it.
Amount of time we want a computation to take. This affects how aggressively we scale up.
Function to group workers together when scaling down See Scheduler.workers_to_close for more information
Minimum number of workers to keep around
Maximum number of workers to keep around
Extra parameters to pass to Scheduler.workers_to_close
Notes
Subclasses can override Adaptive.target() and
Adaptive.workers_to_close() to control when the cluster should be
resized. The default implementation checks if there are too many tasks
per worker or too little memory available (see
distributed.Scheduler.adaptive_target()).
The values for interval, min, max, wait_count and target_duration can be
specified in the dask config under the distributed.adaptive key.
Examples
This is commonly used from existing Dask classes, like KubeCluster
>>> fromdask_kubernetesimport KubeCluster >>> cluster = KubeCluster() >>> cluster.adapt(minimum=10, maximum=100)
Alternatively you can use it from your own Cluster class by subclassing from Dask’s Cluster superclass
>>> fromdistributed.deployimport Cluster >>> classMyCluster(Cluster): ... defscale_up(self, n): ... """ Bring worker count up to n """ ... defscale_down(self, workers): ... """ Remove worker addresses from cluster """
>>> cluster = MyCluster() >>> cluster.adapt(minimum=10, maximum=100)
Superclass for cluster objects
This class contains common functionality for Dask Cluster manager classes.
To implement this class, you must provide
A scheduler_comm attribute, which is a connection to the scheduler
following the distributed.core.rpc API.
Implement scale, which takes an integer and scales the cluster to
that many workers, or else set _supports_scaling to False
For that, you should get the following:
A standard __repr__
A live IPython widget
Adaptive scaling
Integration with dask-labextension
A scheduler_info attribute which contains an up-to-date copy of
Scheduler.identity(), which is used for much of the above
Methods to gather logs