Software-defined assets can depend on other software-defined assets. An asset dependency means that the contents of an "upstream" asset are used to compute the contents of the "downstream" asset.
Why split up code into multiple assets? There are a few reasons:
Having defined a dataset of cereals, we'll define a downstream asset that contains only the cereals that are manufactured by Nabisco.
import csv
import requests
from dagster import asset
@asset
def cereals():
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
return [row for row in csv.DictReader(lines)]
@asset
def nabisco_cereals(cereals):
"""Cereals manufactured by Nabisco"""
return [row for row in cereals if row["mfr"] == "N"]
We've defined our new asset, nabisco_cereals
, with an argument, cereals
.
Dagster offers a few ways of specifying asset dependencies, but the easiest is to include an upstream asset name as an argument to the decorated function. When it's time to materialize the contents of the nabisco_cereals
asset, the contents of cereals
asset are provided as the value for the cereals
argument to its compute function.
So:
cereals
doesn't depend on any other assetnabisco_cereals
depends on cereals
Let's visualize these assets in Dagit:
dagit -f serial_asset_graph.py
Navigate to http://127.0.0.1:3000. From the Assets page, click an asset and then the Lineage tab:
To materialize the assets, click Materialize all.
Assets don't need to be wired together serially. An asset can depend on and be depended on by any number of other assets.
Here, we're interested in which of Nabisco's cereals has the most protein. We define four assets:
cereals
and nabisco_cereals
assets, same as abovecereal_protein_fractions
asset, which records each cereal's protein content as a fraction of its total masshighest_protein_nabisco_cereal
, which is the name of the Nabisco cereal that has the highest protein contentimport csv
import requests
from dagster import asset
@asset
def cereals():
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
return [row for row in csv.DictReader(lines)]
@asset
def nabisco_cereals(cereals):
"""Cereals manufactured by Nabisco"""
return [row for row in cereals if row["mfr"] == "N"]
@asset
def cereal_protein_fractions(cereals):
"""
For each cereal, records its protein content as a fraction of its total mass.
"""
result = {}
for cereal in cereals:
total_grams = float(cereal["weight"]) * 28.35
result[cereal["name"]] = float(cereal["protein"]) / total_grams
return result
@asset
def highest_protein_nabisco_cereal(nabisco_cereals, cereal_protein_fractions):
"""
The name of the nabisco cereal that has the highest protein content.
"""
sorted_by_protein = sorted(
nabisco_cereals, key=lambda cereal: cereal_protein_fractions[cereal["name"]]
)
return sorted_by_protein[-1]["name"]
Let's visualize these assets in Dagit:
dagit -f complex_asset_graph.py
Navigate to http://127.0.0.1:3000. From the Assets page, click an asset and then the Lineage tab:
If you click the "Materialize All" button, you'll see that cereals
executes first, followed by nabisco_cereals
and cereal_protein_fractions
executing in parallel, since they don't depend on each other's outputs. Finally, highest_protein_nabisco_cereal
executes last, only after nabisco_cereals
and cereal_protein_fractions
have both executed.