Dask (dagster-dask)

See also the Dask deployment guide.

dagster_dask.dask_executor ExecutorDefinition[source]

Config Schema:
cluster (selector)
Config Schema:
existing (strict dict)

Connect to an existing scheduler.

Config Schema:
address (dagster.StringSource)

local (permissive dict, optional)

Local cluster configuration.

yarn (permissive dict, optional)

YARN cluster configuration.

ssh (permissive dict, optional)

SSH cluster configuration.

pbs (permissive dict, optional)

PBS cluster configuration.

moab (permissive dict, optional)

Moab cluster configuration.

sge (permissive dict, optional)

SGE cluster configuration.

lsf (permissive dict, optional)

LSF cluster configuration.

slurm (permissive dict, optional)

SLURM cluster configuration.

oar (permissive dict, optional)

OAR cluster configuration.

kube (permissive dict, optional)

Kubernetes cluster configuration.

Dask-based executor.

The ‘cluster’ can be one of the following: (‘existing’, ‘local’, ‘yarn’, ‘ssh’, ‘pbs’, ‘moab’, ‘sge’, ‘lsf’, ‘slurm’, ‘oar’, ‘kube’).

If the Dask executor is used without providing executor-specific config, a local Dask cluster will be created (as when calling dask.distributed.Client() with dask.distributed.LocalCluster()).

The Dask executor optionally takes the following config:

cluster:
    {
        local?: # takes distributed.LocalCluster parameters
            {
                timeout?: 5,  # Timeout duration for initial connection to the scheduler
                n_workers?: 4  # Number of workers to start
                threads_per_worker?: 1 # Number of threads per each worker
            }
    }

To use the dask_executor, set it as the executor_def when defining a job:

from dagster import job
from dagster_dask import dask_executor

@job(executor_def=dask_executor)
def dask_enabled_job():
    pass