An asset is an object in persistent storage, such as a table, file, or persisted machine learning model. A software-defined asset is a Dagster object that couples an asset to the function and upstream assets that are used to produce its contents.
Software-defined assets enable a declarative approach to data management, in which code is the source of truth on what data assets should exist and how those assets are computed.
A software-defined asset includes the following:
An AssetKey
, which is a handle for referring to the asset.
A set of upstream asset keys, which refer to assets that the contents of the software-defined asset are derived from.
An op, which is a function responsible for computing the contents of the asset from its upstream dependencies.
Note: A crucial distinction between software-defined assets and ops is that software-defined assets know about their dependencies, while ops do not. Ops aren't connected to dependencies until they're placed inside a graph.
Materializing an asset is the act of running its op and saving the results to persistent storage. You can initiate materializations from Dagit or by invoking Python APIs. By default, assets are materialized to pickle files on your local filesystem, but materialization behavior is fully customizable using IO managers. It's possible to materialize an asset in multiple storage environments, such as production and staging.
Name | Description |
---|---|
@asset | A decorator used to define assets. |
SourceAsset | A class that describes an asset, but doesn't define how to compute it. SourceAsset s are used to represent assets that other assets depend on, in settings where they can't be materialized themselves. |
The easiest way to create a software-defined asset is with the @asset
decorator.
from dagster import asset
@asset
def my_asset():
return [1, 2, 3]
By default, the name of the decorated function, my_asset
, is used as the asset key. The decorated function forms the asset's op: it's responsible for producing the asset's contents. The asset in this example doesn't depend on any other assets.
Software-defined assets can depend on other software-defined assets. In this section, we'll show you how to define:
The easiest way to define an asset dependency is to include an upstream asset name as an argument to the decorated function.
In the following example, downstream_asset
depends on upstream_asset
. That means that the contents of upstream_asset
are provided to the function that computes the contents of downstream_asset
.
@asset
def upstream_asset():
return [1, 2, 3]
@asset
def downstream_asset(upstream_asset):
return upstream_asset + [4]
If defining dependencies by matching argument names to upstream asset names feels too magical for your tastes, you can also define dependencies in a more explicit way:
from dagster import AssetIn, asset
@asset
def upstream_asset():
return [1, 2, 3]
@asset(ins={"upstream": AssetIn("upstream_asset")})
def downstream_asset(upstream):
return upstream + [4]
In this case, ins={"upstream": AssetIn("upstream_asset")}
declares that the contents of the asset with the key upstream_asset
will be provided to the function argument named upstream
.
Asset keys can also be provided to AssetIn
to explicitly identify the asset. For example:
from dagster import AssetIn, AssetKey, asset
# One way of providing explicit asset keys:
@asset(ins={"upstream": AssetIn(asset_key="upstream_asset")})
def downstream_asset(upstream):
return upstream + [4]
# Another way:
@asset(ins={"upstream": AssetIn(asset_key=AssetKey("upstream_asset"))})
def another_downstream_asset(upstream):
return upstream + [10]
Software-defined assets frequently depend on assets that are generated elsewhere. Using SourceAsset
, you can include these external assets and allow your other assets to depend on them.
For example:
from dagster import AssetKey, SourceAsset, asset
my_source_asset = SourceAsset(key=AssetKey("a_source_asset"))
@asset
def my_derived_asset(a_source_asset):
return a_source_asset + [4]
Note: The source asset's asset key must be provided as the argument to downstream assets. In the previous example, the asset key is a_source_asset
and not my_source_asset
.
You can also re-use assets across repositories by including them as source assets:
from dagster import AssetKey, SourceAsset, asset, repository
@asset
def repository_a_asset():
return 5
@repository
def repository_a():
return [repository_a_asset]
repository_a_source_asset = SourceAsset(key=AssetKey("repository_a_asset"))
@asset
def repository_b_asset(repository_a_asset):
return repository_a_asset + 6
@repository
def repository_b():
return [repository_b_asset, repository_a_source_asset]
Using source assets has a few advantages over having the code inside of an asset's op load the data:
SourceAsset
, Dagit can represent the asset lineage across those repositories.IOManager
to load the contents of the source asset.Alternatively, you can define dependencies where data from an upstream asset doesn’t need to be loaded by Dagster to compute a downstream asset's output. When used, non_argument_deps
defines the dependency between assets but doesn’t pass data through Dagster.
Consider the following example:
upstream_asset
creates a new table (sugary_cereals
) by selecting records from the cereals
tabledownstream_asset
then creates a new table (shopping_list
) by selecting records from sugary_cereals
from dagster import asset
@asset
def upstream_asset():
execute_query("CREATE TABLE sugary_cereals AS SELECT * FROM cereals")
@asset(non_argument_deps={"upstream_asset"})
def downstream_asset():
execute_query("CREATE TABLE shopping_list AS SELECT * FROM sugary_cereals")
In this example, Dagster doesn’t need to load data from upstream_asset
to successfully compute the downstream_asset
. While downstream_asset
does depend on upstream_asset
, the key difference with non_argument_deps
is that data isn’t being passed between the functions. Specifically, the data from the sugary_cereals
table isn't being passed as an argument to downstream_asset
.
Basic software-defined assets are computed using a single op. If generating an asset involves multiple discrete computations, you can use graph-backed assets by separating each computation into an op and building a graph to combine your computations. This allows you to launch re-executions of runs at the op boundaries but doesn't require you to link each intermediate value to an asset in persistent storage.
Graph-backed assets are useful if you have an existing graph that produces and consumes assets. Wrapping your graph inside a software-defined asset gives you all the benefits of software-defined assets — like cross-job lineage — without requiring you to change the code inside your graph.
To define a graph-backed asset, use the from_graph
attribute on the AssetsDefinition
object:
@op(required_resource_keys={"slack"})
def fetch_files_from_slack(context) -> DataFrame:
files = context.resources.slack.files_list(channel="#random")
return DataFrame(
[
{
"id": file.get("id"),
"created": file.get("created"),
"title": file.get("title"),
"permalink": file.get("permalink"),
}
for file in files
]
)
@op
def store_files(files):
return files.to_sql(name="slack_files", con=create_db_connection())
@graph
def store_slack_files_in_sql():
store_files(fetch_files_from_slack())
graph_asset = AssetsDefinition.from_graph(store_slack_files_in_sql)
Note: All output assets must be selected when using a graph-backed asset to create a job. Dagster will select all graph output automatically upon creating a job.
The from_graph
attribute on the AssetsDefinition
object infers upstream and downstream asset dependencies from the graph definition provided. In the most simple case when the graph returns a singular output, Dagster infers the name of the graph to be the outputted asset key.
In the example below, Dagster creates an asset with key middle_asset
from the middle_asset
graph. Just like assets defined via @asset
, each argument to the decorated graph function is an upstream asset name. middle_asset
depends on upstream_asset
, and downstream_asset
depends on middle_asset
:
@asset
def upstream_asset():
return 1
@graph
def middle_asset(upstream_asset):
return add_one(upstream_asset)
middle_asset = AssetsDefinition.from_graph(middle_asset)
@asset
def downstream_asset(middle_asset):
return middle_asset + 1
When your graph returns multiple outputs, Dagster infers each output name to be the outputted asset key. In the below example, two_assets_graph
accepts upstream_asset
and outputs two assets, first_asset
and second_asset
:
@graph(out={"first_asset": GraphOut(), "second_asset": GraphOut()})
def two_assets_graph(upstream_asset):
one, two = two_outputs(upstream_asset)
return {"first_asset": one, "second_asset": two}
two_assets = AssetsDefinition.from_graph(two_assets_graph)
You can also define dependencies for graph-backed assets explicitly via the asset_keys_by_input_name
and asset_keys_by_output_name
arguments to from_graph
:
@graph(out={"one": GraphOut(), "two": GraphOut()})
def return_one_and_two(zero):
one, two = two_outputs(zero)
return {"one": one, "two": two}
explicit_deps_asset = AssetsDefinition.from_graph(
return_one_and_two,
asset_keys_by_input_name={"zero": AssetKey("upstream_asset")},
asset_keys_by_output_name={
"one": AssetKey("asset_one"),
"two": AssetKey("asset_two"),
},
)
Since a software-defined asset contains an op, all the typical functionality of an op - like the use of resources and configuration - is available to an asset. Supplying the context
parameter provides access to system information for the op, for example:
@asset(required_resource_keys={"api"})
def my_asset(context):
# fetches contents of an asset
return context.resources.api.fetch_table("my_asset")
Like ops, configuration is also supported for assets. Configuration is accessible through the asset context at runtime and can be used to specify behavior. Note that asset configuration behaves the same as configuration for ops.
For example, the following asset queries an API endpoint defined through configuration:
@asset(config_schema={"api_endpoint": str})
def my_configurable_asset(context):
api_endpoint = context.op_config["api_endpoint"]
data = requests.get(f"{api_endpoint}/data").json()
return data
Refer to the Config schema documentation for more configuration info and examples.
Once you've defined a set of assets, you can:
To view and materialize assets in Dagit, you can point it at a module that contains asset definitions or lists of asset definitions as module-level attributes:
dagit -m module_with_assets
If you want Dagit to contain both assets and jobs that target the assets, you can place the assets and jobs together inside a repository.
To view a list of all your assets, click Assets in the top-right corner of the page. This opens the Assets page:
View the Asset Details page for an asset by clicking on its name:
To view a graph of all assets and their dependencies, you can:
On occasion, you might see an upstream changed indicator on an asset in the dependency graph or on the Asset Details page:
This occurs when a downstream asset's last materialization took place earlier than the asset it depends on. Dagit displays this alert to notify you that the contents of an asset may be stale. For example:
comments
is upstream of comment_stories
comment_stories
depends on comments
comment_stories
was last materialized on February 25 at 5:30PMcomments
was last materialized on February 25 at 7:05PMIn this case, the contents of comment_stories
may be outdated, as the most recent data from comments
wasn't used to compute them.
You can resolve this issue by re-materializing the downstream asset. This will re-compute the contents with the most recent data/changes to its upstream dependency.
Currently, the upstream changed indicator won't display in the following scenarios:
In Dagit, you can launch runs that materialize assets by:
Jobs that target assets can materialize a fixed selection of assets each time they run and be placed on schedules and sensors. Refer to the Jobs documentation for more info and examples.
To help keep your assets tidy, you can organize them into groups. Grouping assets by project, concept, and so on simplifies keeping track of them in Dagit.
In Dagster, there are two ways to assign assets to groups:
load_assets_from_package_module
(recommended)By default, assets that aren't assigned to a group will be placed in a group named default
. Use Dagit to view these assets.
This recommended approach constructs a group of assets from a specified module in your project. Using the load_assets_from_package_module
function, you can import all assets in a module and apply a grouping:
import my_package.cereal as cereal
cereal_assets = load_assets_from_package_module(
cereal,
group_name="cereal_assets",
)
Assets can also be given groups on an individual basis by specifying an argument when creating the asset:
@asset(group_name="cereal_assets")
def nabisco_cereals():
return [1, 2, 3]
Assets can only be assigned to one group at a time. Attempting to place a grouped asset in a second group will result in an error:
Group name already exists on assets [list_of_asset_keys]
To view your asset groups in Dagit, open the left navigation by clicking the menu icon in the top left corner. As asset groups are grouped in repositories, you may need to open a repository to view its asset groups:
Click the asset group to open a dependency graph for all assets in the group:
When writing unit tests, you can treat the function decorated by @asset
as a regular Python function.
Consider a simple asset with no upstream dependencies:
@asset
def my_simple_asset():
return [1, 2, 3]
When writing a unit test, you can directly invoke the decorated function:
def test_my_simple_asset():
result = my_simple_asset()
assert result == [1, 2, 3]
If you have an asset with upstream dependencies:
@asset
def more_complex_asset(my_simple_asset):
return my_simple_asset + [4, 5, 6]
You can manually provide values for those dependencies in your unit test. This allows you to test assets in isolation from one another:
def test_more_complex_asset():
result = more_complex_asset([0])
assert result == [0, 4, 5, 6]
If you use a context object in your function, @asset
will provide the correct context during execution. When writing a unit test, you can mock it with build_op_context
. You can use build_op_context
to generate the context
object because under the hood the function decorated by @asset
is an op.
Consider this asset that uses a resource:
@asset
def uses_context(context):
return context.resources.foo
When writing a unit test, use build_op_context
to mock the context
and provide values for testing:
def test_uses_context():
context = build_op_context(resources={"foo": "bar"})
result = uses_context(context)
assert result == "bar"
Assets are often objects in systems with hierarchical namespaces, like filesystems. Because of this, it often makes sense for an asset key to be a list of strings, instead of just a single string. To define an asset with a multi-part asset key, use the key_prefix
argument-- this can be either a list of strings or a single string with segments delimited by "/". The full asset key is formed by prepending the key_prefix
to the asset name (which defaults to the name of the decorated function).
from dagster import AssetIn, asset
@asset(key_prefix=["one", "two", "three"])
def upstream_asset():
return [1, 2, 3]
@asset(ins={"upstream_asset": AssetIn(key_prefix="one/two/three")})
def downstream_asset(upstream_asset):
return upstream_asset + [4]
Dagster supports attaching arbitrary metadata to assets. To attach metadata, supply a metadata
dictionary to the asset:
@asset(metadata={"cereal_name": "Sugar Sprinkles"})
def cereal_asset():
return 5
Asset metadata can be viewed in Dagit on the Asset Detail page.
Interested in learning more about software-defined assets and working through a more complex example? Check out our guide on software-defined assets and our example project that integrates software-defined assets with other Modern Data Stack tools.
For more examples of software-defined assets, check out these examples: