Internals

Please note that internal APIs are likely to be in much greater flux pre-1.0 than user-facing APIs, particularly if not exported in the top level dagster module.

If you find yourself consulting these docs because you are writing custom components and plug-ins, please get in touch with the core team on our Slack. We’re curious what you’re up to, happy to help, excited for new community contributions, and eager to make the system as easy to work with as possible – including for teams who are looking to customize it.

Executors

@dagster.executor(name=None, config_schema=None, requirements=None)[source]

Define an executor.

The decorated function should accept an InitExecutorContext and return an instance of Executor.

Parameters
  • name (Optional[str]) – The name of the executor.

  • config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in init_context.executor_config. If not set, Dagster will accept any config provided for.

  • requirements (Optional[List[ExecutorRequirement]]) – Any requirements that must be met in order for the executor to be usable for a particular pipeline execution.

class dagster.ExecutorDefinition(name, config_schema=None, requirements=None, executor_creation_fn=None, description=None)[source]
Parameters
  • name (str) – The name of the executor.

  • config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in init_context.executor_config. If not set, Dagster will accept any config provided.

  • requirements (Optional[List[ExecutorRequirement]]) – Any requirements that must be met in order for the executor to be usable for a particular pipeline execution.

  • executor_creation_fn (Optional[Callable]) – Should accept an InitExecutorContext and return an instance of Executor

  • required_resource_keys (Optional[Set[str]]) – Keys for the resources required by the executor.

configured(config_or_config_fn, name=None, config_schema=None, description=None)[source]

Wraps this object in an object of the same type that provides configuration to the inner object.

Parameters
  • config_or_config_fn (Union[Any, Callable[[Any], Any]]) – Either (1) Run configuration that fully satisfies this object’s config schema or (2) A function that accepts run configuration and returns run configuration that fully satisfies this object’s config schema. In the latter case, config_schema must be specified. When passing a function, it’s easiest to use configured().

  • name (Optional[str]) – Name of the new definition. If not provided, the emitted definition will inherit the name of the ExecutorDefinition upon which this function is called.

  • config_schema (Optional[ConfigSchema]) – If config_or_config_fn is a function, the config schema that its input must satisfy. If not set, Dagster will accept any config provided.

  • description (Optional[str]) – Description of the new definition. If not specified, inherits the description of the definition being configured.

Returns (ConfigurableDefinition): A configured version of this object.

class dagster.InitExecutorContext(job, executor_def, executor_config, instance)[source]

Executor-specific initialization context.

job

The job to be executed.

Type

IPipeline

executor_def

The definition of the executor currently being constructed.

Type

ExecutorDefinition

executor_config

The parsed config passed to the executor.

Type

dict

instance

The current instance.

Type

DagsterInstance

class dagster.Executor[source]
abstract execute(plan_context, execution_plan)[source]

For the given context and execution plan, orchestrate a series of sub plan executions in a way that satisfies the whole plan being executed.

Parameters
  • plan_context (PlanOrchestrationContext) – The plan’s orchestration context.

  • execution_plan (ExecutionPlan) – The plan to execute.

Returns

A stream of dagster events.

abstract property retries

Whether retries are enabled or disabled for this instance of the executor.

Executors should allow this to be controlled via configuration if possible.

Returns: RetryMode


File Manager

class dagster.core.storage.file_manager.FileManager[source]

Base class for all file managers in dagster.

The file manager is an interface that can be implemented by resources to provide abstract access to a file system such as local disk, S3, or other cloud storage.

For examples of usage, see the documentation of the concrete file manager implementations.

abstract copy_handle_to_local_temp(file_handle)[source]

Copy a file represented by a file handle to a temp file.

In an implementation built around an object store such as S3, this method would be expected to download the file from S3 to local filesystem in a location assigned by the standard library’s tempfile module.

Temp files returned by this method are not guaranteed to be reusable across solid boundaries. For files that must be available across solid boundaries, use the read(), read_data(), write(), and write_data() methods.

Parameters

file_handle (FileHandle) – The handle to the file to make available as a local temp file.

Returns

Path to the local temp file.

Return type

str

abstract delete_local_temp()[source]

Delete all local temporary files created by previous calls to copy_handle_to_local_temp().

Should typically only be called by framework implementors.

abstract read(file_handle, mode='rb')[source]

Return a file-like stream for the file handle.

This may incur an expensive network call for file managers backed by object stores such as S3.

Parameters
  • file_handle (FileHandle) – The file handle to make available as a stream.

  • mode (str) – The mode in which to open the file. Default: "rb".

Returns

A file-like stream.

Return type

Union[TextIO, BinaryIO]

abstract read_data(file_handle)[source]

Return the bytes for a given file handle. This may incur an expensive network call for file managers backed by object stores such as s3.

Parameters

file_handle (FileHandle) – The file handle for which to return bytes.

Returns

Bytes for a given file handle.

Return type

bytes

abstract write(file_obj, mode='wb', ext=None)[source]

Write the bytes contained within the given file object into the file manager.

Parameters
  • file_obj (Union[TextIO, StringIO]) – A file-like object.

  • mode (Optional[str]) – The mode in which to write the file into the file manager. Default: "wb".

  • ext (Optional[str]) – For file managers that support file extensions, the extension with which to write the file. Default: None.

Returns

A handle to the newly created file.

Return type

FileHandle

abstract write_data(data, ext=None)[source]

Write raw bytes into the file manager.

Parameters
  • data (bytes) – The bytes to write into the file manager.

  • ext (Optional[str]) – For file managers that support file extensions, the extension with which to write the file. Default: None.

Returns

A handle to the newly created file.

Return type

FileHandle

dagster.local_file_manager ResourceDefinition[source]

FileManager that provides abstract access to a local filesystem.

By default, files will be stored in <local_artifact_storage>/storage/file_manager where <local_artifact_storage> can be configured the dagster.yaml file in $DAGSTER_HOME.

Implements the FileManager API.

Examples:

import tempfile

from dagster import ModeDefinition, local_file_manager, pipeline, solid


@solid(required_resource_keys={"file_manager"})
def write_files(context):
    fh_1 = context.resources.file_manager.write_data(b"foo")

    with tempfile.NamedTemporaryFile("w+") as fd:
        fd.write("bar")
        fd.seek(0)
        fh_2 = context.resources.file_manager.write(fd, mode="w", ext=".txt")

    return (fh_1, fh_2)


@solid(required_resource_keys={"file_manager"})
def read_files(context, file_handles):
    fh_1, fh_2 = file_handles
    assert context.resources.file_manager.read_data(fh_2) == b"bar"
    fd = context.resources.file_manager.read(fh_2, mode="r")
    assert fd.read() == "foo"
    fd.close()


@pipeline(mode_defs=[ModeDefinition(resource_defs={"file_manager": local_file_manager})])
def files_pipeline():
    read_files(write_files())

Or to specify the file directory:

@pipeline(
    mode_defs=[
        ModeDefinition(
            resource_defs={
                "file_manager": local_file_manager.configured({"base_dir": "/my/base/dir"})
            }
        )
    ]
)
def files_pipeline():
    read_files(write_files())

Instance

class dagster.DagsterInstance(instance_type, local_artifact_storage, run_storage, event_storage, compute_log_manager, run_coordinator, run_launcher, scheduler=None, schedule_storage=None, settings=None, ref=None)[source]

Core abstraction for managing Dagster’s access to storage and other resources.

Use DagsterInstance.get() to grab the current DagsterInstance which will load based on the values in the dagster.yaml file in $DAGSTER_HOME.

Alternatively, DagsterInstance.ephemeral() can use used which provides a set of transient in-memory components.

Configuration of this class should be done by setting values in $DAGSTER_HOME/dagster.yaml. For example, to use Postgres for dagster storage, you can write a dagster.yaml such as the following:

dagster.yaml
storage:
  postgres:
    postgres_db:
      username: my_username
      password: my_password
      hostname: my_hostname
      db_name: my_database
      port: 5432
Parameters
  • instance_type (InstanceType) – Indicates whether the instance is ephemeral or persistent. Users should not attempt to set this value directly or in their dagster.yaml files.

  • local_artifact_storage (LocalArtifactStorage) – The local artifact storage is used to configure storage for any artifacts that require a local disk, such as schedules, or when using the filesystem system storage to manage files and intermediates. By default, this will be a dagster.core.storage.root.LocalArtifactStorage. Configurable in dagster.yaml using the ConfigurableClass machinery.

  • run_storage (RunStorage) – The run storage is used to store metadata about ongoing and past pipeline runs. By default, this will be a dagster.core.storage.runs.SqliteRunStorage. Configurable in dagster.yaml using the ConfigurableClass machinery.

  • event_storage (EventLogStorage) – Used to store the structured event logs generated by pipeline runs. By default, this will be a dagster.core.storage.event_log.SqliteEventLogStorage. Configurable in dagster.yaml using the ConfigurableClass machinery.

  • compute_log_manager (ComputeLogManager) – The compute log manager handles stdout and stderr logging for solid compute functions. By default, this will be a dagster.core.storage.local_compute_log_manager.LocalComputeLogManager. Configurable in dagster.yaml using the ConfigurableClass machinery.

  • run_coordinator (RunCoordinator) – A runs coordinator may be used to manage the execution of pipeline runs.

  • run_launcher (Optional[RunLauncher]) – Optionally, a run launcher may be used to enable a Dagster instance to launch pipeline runs, e.g. on a remote Kubernetes cluster, in addition to running them locally.

  • settings (Optional[Dict]) – Specifies certain per-instance settings, such as feature flags. These are set in the dagster.yaml under a set of whitelisted keys.

  • ref (Optional[InstanceRef]) – Used by internal machinery to pass instances across process boundaries.

add_daemon_heartbeat(daemon_heartbeat)[source]

Called on a regular interval by the daemon

get_daemon_heartbeats()[source]

Latest heartbeats of all daemon types

launch_run(run_id, workspace)[source]

Launch a pipeline run.

This method is typically called using instance.submit_run rather than being invoked directly. This method delegates to the RunLauncher, if any, configured on the instance, and will call its implementation of RunLauncher.launch_run() to begin the execution of the specified run. Runs should be created in the instance (e.g., by calling DagsterInstance.create_run()) before this method is called, and should be in the PipelineRunStatus.NOT_STARTED state.

Parameters

run_id (str) – The id of the run the launch.

report_dagster_event(dagster_event, run_id, log_level=20)[source]

Takes a DagsterEvent and stores it in persistent storage for the corresponding PipelineRun

report_engine_event(message, pipeline_run=None, engine_event_data=None, cls=None, step_key=None, pipeline_name=None, run_id=None)[source]

Report a EngineEvent that occurred outside of a pipeline execution context.

resume_run(run_id, workspace, attempt_number)[source]

Resume a pipeline run.

This method should be called on runs which have already been launched, but whose run workers have died.

Parameters

run_id (str) – The id of the run the launch.

property should_start_background_run_thread

Gate on an experimental feature to start a thread that monitors for if the run should be canceled.

submit_run(run_id, workspace)[source]

Submit a pipeline run to the coordinator.

This method delegates to the RunCoordinator, configured on the instance, and will call its implementation of RunCoordinator.submit_run() to send the run to the coordinator for execution. Runs should be created in the instance (e.g., by calling DagsterInstance.create_run()) before this method is called, and should be in the PipelineRunStatus.NOT_STARTED state. They also must have a non-null ExternalPipelineOrigin.

Parameters

run_id (str) – The id of the run.

class dagster.core.instance.InstanceRef(local_artifact_storage_data, compute_logs_data, scheduler_data, run_coordinator_data, run_launcher_data, settings, run_storage_data, event_storage_data, schedule_storage_data, custom_instance_class_data=None, storage_data=None)[source]

Serializable representation of a DagsterInstance.

Users should not instantiate this class directly.

class dagster.serdes.ConfigurableClass[source]

Abstract mixin for classes that can be loaded from config.

This supports a powerful plugin pattern which avoids both a) a lengthy, hard-to-synchronize list of conditional imports / optional extras_requires in dagster core and b) a magic directory or file in which third parties can place plugin packages. Instead, the intention is to make, e.g., run storage, pluggable with a config chunk like:

run_storage:
    module: very_cool_package.run_storage
    class: SplendidRunStorage
    config:
        magic_word: "quux"

This same pattern should eventually be viable for other system components, e.g. engines.

The ConfigurableClass mixin provides the necessary hooks for classes to be instantiated from an instance of ConfigurableClassData.

Pieces of the Dagster system which we wish to make pluggable in this way should consume a config type such as:

{'module': str, 'class': str, 'config': Field(Permissive())}
abstract classmethod config_type()[source]

dagster.ConfigType: The config type against which to validate a config yaml fragment serialized in an instance of ConfigurableClassData.

abstract static from_config_value(inst_data, config_value)[source]

New up an instance of the ConfigurableClass from a validated config value.

Called by ConfigurableClassData.rehydrate.

Parameters

config_value (dict) – The validated config value to use. Typically this should be the value attribute of a EvaluateValueResult.

A common pattern is for the implementation to align the config_value with the signature of the ConfigurableClass’s constructor:

@staticmethod
def from_config_value(inst_data, config_value):
    return MyConfigurableClass(inst_data=inst_data, **config_value)
abstract property inst_data

Subclass must be able to return the inst_data as a property if it has been constructed through the from_config_value code path.

class dagster.serdes.ConfigurableClassData(module_name, class_name, config_yaml)[source]

Serializable tuple describing where to find a class and the config fragment that should be used to instantiate it.

Users should not instantiate this class directly.

Classes intended to be serialized in this way should implement the dagster.serdes.ConfigurableClass mixin.

class dagster.core.storage.root.LocalArtifactStorage(base_dir, inst_data=None)[source]
classmethod config_type()[source]

dagster.ConfigType: The config type against which to validate a config yaml fragment serialized in an instance of ConfigurableClassData.

static from_config_value(inst_data, config_value)[source]

New up an instance of the ConfigurableClass from a validated config value.

Called by ConfigurableClassData.rehydrate.

Parameters

config_value (dict) – The validated config value to use. Typically this should be the value attribute of a EvaluateValueResult.

A common pattern is for the implementation to align the config_value with the signature of the ConfigurableClass’s constructor:

@staticmethod
def from_config_value(inst_data, config_value):
    return MyConfigurableClass(inst_data=inst_data, **config_value)
property inst_data

Subclass must be able to return the inst_data as a property if it has been constructed through the from_config_value code path.


Storage

class dagster.core.storage.base_storage.DagsterStorage[source]

Abstract base class for Dagster persistent storage, for reading and writing data for runs, events, and schedule/sensor state.

Users should not directly instantiate concrete subclasses of this class; they are instantiated by internal machinery when dagit and dagster-daemon load, based on the values in the dagster.yaml file in $DAGSTER_HOME. Configuration of concrete subclasses of this class should be done by setting values in that file.


Run storage

class dagster.PipelineRun(pipeline_name, run_id=None, run_config=None, mode=None, asset_selection=None, solid_selection=None, solids_to_execute=None, step_keys_to_execute=None, status=None, tags=None, root_run_id=None, parent_run_id=None, pipeline_snapshot_id=None, execution_plan_snapshot_id=None, external_pipeline_origin=None, pipeline_code_origin=None)[source]

Serializable internal representation of a pipeline run, as stored in a RunStorage.

class dagster.DagsterRunStatus(value)[source]

The status of run execution.

CANCELED = 'CANCELED'
CANCELING = 'CANCELING'
FAILURE = 'FAILURE'
MANAGED = 'MANAGED'
NOT_STARTED = 'NOT_STARTED'
QUEUED = 'QUEUED'
STARTED = 'STARTED'
STARTING = 'STARTING'
SUCCESS = 'SUCCESS'
dagster.PipelineRunStatus

alias of dagster.core.storage.pipeline_run.DagsterRunStatus

class dagster.core.storage.runs.RunStorage[source]

Abstract base class for storing pipeline run history.

Note that run storages using SQL databases as backing stores should implement SqlRunStorage.

Users should not directly instantiate concrete subclasses of this class; they are instantiated by internal machinery when dagit and dagster-graphql load, based on the values in the dagster.yaml file in $DAGSTER_HOME. Configuration of concrete subclasses of this class should be done by setting values in that file.

class dagster.core.storage.runs.SqlRunStorage[source]

Base class for SQL based run storages

class dagster.core.storage.runs.SqliteRunStorage(conn_string, inst_data=None)[source]

SQLite-backed run storage.

Users should not directly instantiate this class; it is instantiated by internal machinery when dagit and dagster-graphql load, based on the values in the dagster.yaml file in $DAGSTER_HOME. Configuration of this class should be done by setting values in that file.

This is the default run storage when none is specified in the dagster.yaml.

To explicitly specify SQLite for run storage, you can add a block such as the following to your dagster.yaml:

run_storage:
  module: dagster.core.storage.runs
  class: SqliteRunStorage
  config:
    base_dir: /path/to/dir

The base_dir param tells the run storage where on disk to store the database.

See also: dagster_postgres.PostgresRunStorage and dagster_mysql.MySQLRunStorage.


Event log storage

class dagster.core.storage.event_log.EventLogEntry(error_info, level, user_message, run_id, timestamp, step_key=None, pipeline_name=None, dagster_event=None, job_name=None)[source]

Entries in the event log.

These entries may originate from the logging machinery (DagsterLogManager/context.log), from framework events (e.g. EngineEvent), or they may correspond to events yielded by user code (e.g. Output).

Parameters
  • error_info (Optional[SerializableErrorInfo]) – Error info for an associated exception, if any, as generated by serializable_error_info_from_exc_info and friends.

  • level (Union[str, int]) – The Python log level at which to log this event. Note that framework and user code events are also logged to Python logging. This value may be an integer or a (case-insensitive) string member of PYTHON_LOGGING_LEVELS_NAMES.

  • user_message (str) – For log messages, this is the user-generated message.

  • run_id (str) – The id of the run which generated this event.

  • timestamp (float) – The Unix timestamp of this event.

  • step_key (Optional[str]) – The step key for the step which generated this event. Some events are generated outside of a step context.

  • job_name (Optional[str]) – The job which generated this event. Some events are generated outside of a job context.

  • dagster_event (Optional[DagsterEvent]) – For framework and user events, the associated structured event.

  • pipeline_name (Optional[str]) – (legacy) The pipeline which generated this event. Some events are generated outside of a pipeline context.

class dagster.core.storage.event_log.EventLogRecord(storage_id, event_log_entry)[source]

Internal representation of an event record, as stored in a EventLogStorage.

class dagster.core.storage.event_log.EventRecordsFilter(event_type, asset_key=None, asset_partitions=None, after_cursor=None, before_cursor=None, after_timestamp=None, before_timestamp=None)[source]

Defines a set of filter fields for fetching a set of event log entries or event log records.

Parameters
  • event_type (DagsterEventType) – Filter argument for dagster event type

  • asset_key (Optional[AssetKey]) – Asset key for which to get asset materialization event entries / records.

  • asset_partitions (Optional[List[str]]) – Filter parameter such that only asset materialization events with a partition value matching one of the provided values. Only valid when the asset_key parameter is provided.

  • after_cursor (Optional[Union[int, RunShardedEventsCursor]]) – Filter parameter such that only records with storage_id greater than the provided value are returned. Using a run-sharded events cursor will result in a significant performance gain when run against a SqliteEventLogStorage implementation (which is run-sharded)

  • before_cursor (Optional[Union[int, RunShardedEventsCursor]]) – Filter parameter such that records with storage_id less than the provided value are returned. Using a run-sharded events cursor will result in a significant performance gain when run against a SqliteEventLogStorage implementation (which is run-sharded)

  • after_timestamp (Optional[float]) – Filter parameter such that only event records for events with timestamp greater than the provided value are returned.

  • before_timestamp (Optional[float]) – Filter parameter such that only event records for events with timestamp less than the provided value are returned.

class dagster.core.storage.event_log.RunShardedEventsCursor(id, run_updated_after)[source]

Pairs an id-based event log cursor with a timestamp-based run cursor, for improved performance on run-sharded event log storages (e.g. the default SqliteEventLogStorage). For run-sharded storages, the id field is ignored, since they may not be unique across shards

class dagster.core.storage.event_log.EventLogStorage[source]

Abstract base class for storing structured event logs from pipeline runs.

Note that event log storages using SQL databases as backing stores should implement SqlEventLogStorage.

Users should not directly instantiate concrete subclasses of this class; they are instantiated by internal machinery when dagit and dagster-graphql load, based on the values in the dagster.yaml file in $DAGSTER_HOME. Configuration of concrete subclasses of this class should be done by setting values in that file.

class dagster.core.storage.event_log.SqlEventLogStorage[source]

Base class for SQL backed event log storages.

Distinguishes between run-based connections and index connections in order to support run-level sharding, while maintaining the ability to do cross-run queries

class dagster.core.storage.event_log.SqliteEventLogStorage(base_dir, inst_data=None)[source]

SQLite-backed event log storage.

Users should not directly instantiate this class; it is instantiated by internal machinery when dagit and dagster-graphql load, based on the values in the dagster.yaml file in $DAGSTER_HOME. Configuration of this class should be done by setting values in that file.

This is the default event log storage when none is specified in the dagster.yaml.

To explicitly specify SQLite for event log storage, you can add a block such as the following to your dagster.yaml:

event_log_storage:
  module: dagster.core.storage.event_log
  class: SqliteEventLogStorage
  config:
    base_dir: /path/to/dir

The base_dir param tells the event log storage where on disk to store the databases. To improve concurrent performance, event logs are stored in a separate SQLite database for each run.

class dagster.core.storage.event_log.ConsolidatedSqliteEventLogStorage(base_dir, inst_data=None)[source]

SQLite-backed consolidated event log storage intended for test cases only.

Users should not directly instantiate this class; it is instantiated by internal machinery when dagit and dagster-graphql load, based on the values in the dagster.yaml file in $DAGSTER_HOME. Configuration of this class should be done by setting values in that file.

To explicitly specify the consolidated SQLite for event log storage, you can add a block such as the following to your dagster.yaml:

run_storage:
  module: dagster.core.storage.event_log
  class: ConsolidatedSqliteEventLogStorage
  config:
    base_dir: /path/to/dir

The base_dir param tells the event log storage where on disk to store the database.

See also: dagster_postgres.PostgresEventLogStorage and dagster_mysql.MySQLEventLogStorage.


Compute log manager

class dagster.core.storage.compute_log_manager.ComputeLogManager[source]

Abstract base class for storing unstructured compute logs (stdout/stderr) from the compute steps of pipeline solids.

class dagster.core.storage.local_compute_log_manager.LocalComputeLogManager(base_dir, polling_timeout=None, inst_data=None)[source]

Stores copies of stdout & stderr for each compute step locally on disk.

See also: dagster_aws.S3ComputeLogManager.


Run launcher

class dagster.core.launcher.RunLauncher[source]
class dagster.core.launcher.DefaultRunLauncher(inst_data=None, wait_for_processes=False)[source]

Launches runs against running GRPC servers.


Run coordinator

class dagster.core.run_coordinator.DefaultRunCoordinator(inst_data=None)[source]

Immediately send runs to the run launcher.

dagster.core.run_coordinator.QueuedRunCoordinator RunCoordinator[source]

Config Schema:
max_concurrent_runs (dagster.IntSource, optional)

The maximum number of runs that are allowed to be in progress at once. Defaults to 10. Set to -1 to disable the limit. Set to 0 to stop any runs from launching. Any other negative values are disallowed.

tag_concurrency_limits (Union[List[strict dict], None], optional)

A set of limits that are applied to runs with particular tags. If a value is set, the limit is applied to only that key-value pair. If no value is set, the limit is applied across all values of that key. If the value is set to a dict with applyLimitPerUniqueValue: true, the limit will apply to the number of unique values for that key.

dequeue_interval_seconds (dagster.IntSource, optional)

The interval in seconds at which the Dagster Daemon should periodically check the run queue for new runs to launch.

Enqueues runs via the run storage, to be deqeueued by the Dagster Daemon process. Requires the Dagster Daemon process to be alive in order for runs to be launched.


Scheduling

class dagster.core.scheduler.Scheduler[source]

Abstract base class for a scheduler. This component is responsible for interfacing with an external system such as cron to ensure scheduled repeated execution according.

class dagster.core.storage.schedules.ScheduleStorage[source]

Abstract class for managing persistance of scheduler artifacts

class dagster.core.storage.schedules.SqlScheduleStorage[source]

Base class for SQL backed schedule storage

class dagster.core.storage.schedules.SqliteScheduleStorage(conn_string, inst_data=None)[source]

Local SQLite backed schedule storage

see also: dagster_postgres.PostgresScheduleStorage and dagster_mysql.MySQLScheduleStorage.


Exception handling

dagster.core.errors.user_code_error_boundary(error_cls, msg_fn, log_manager=None, **kwargs)[source]

Wraps the execution of user-space code in an error boundary. This places a uniform policy around any user code invoked by the framework. This ensures that all user errors are wrapped in an exception derived from DagsterUserCodeExecutionError, and that the original stack trace of the user error is preserved, so that it can be reported without confusing framework code in the stack trace, if a tool author wishes to do so.

Examples:

with user_code_error_boundary(
    # Pass a class that inherits from DagsterUserCodeExecutionError
    DagsterExecutionStepExecutionError,
    # Pass a function that produces a message
    "Error occurred during step execution"
):
    call_user_provided_function()