Dagster's core abstractions are ops and jobs.
Ops are individual units of computation that we wire together to form jobs.
In this section, we'll cover how to define a simple job with a single op, and then execute it.
Our job will operate on a simple but scary CSV dataset, cereal.csv, which contains nutritional facts about 80 breakfast cereals.
Let's write our first Dagster op and save it as hello_cereal.py
.
A op is a unit of computation in a job. Typically, you'll define ops by annotating ordinary Python functions with the @op
decorator.
Our first op does three things: downloads a CSV of cereal data, reads it into a list of dictionaries which each represent a row in the CSV, and logs the number of rows it finds.
import requests
import csv
from dagster import job, op, get_dagster_logger
@op
def hello_cereal():
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
cereals = [row for row in csv.DictReader(lines)]
get_dagster_logger().info(f"Found {len(cereals)} cereals")
In this simple case, our op takes no arguments, and also returns no outputs. Don't worry, we'll soon encounter ops that are much more dynamic.
To execute our op, we'll embed it in an equally simple job. A job is a set of ops arranged into a DAG of computation. You'll typically define jobs by annotating ordinary Python functions with the @job
decorator.
@job
def hello_cereal_job():
hello_cereal()
Here you'll see that we call hello_cereal()
. This call doesn't actually execute the op. Within the bodies of functions decorated with @job
, we use function calls to indicate the dependency structure of the op making up the job. Here, we indicate that the execution of hello_cereal
doesn't depend on any other ops by calling it with no arguments.
Assuming you’ve saved this job as hello_cereal.py
, you can execute it via any of three different mechanisms:
To visualize your job (which only has one op) in Dagit, just run the following. Make sure you're in the directory in which you've saved the job file:
dagit -f hello_cereal.py
You'll see output like
Serving dagit on http://127.0.0.1:3000 in process 70635
You should be able to navigate to http://127.0.0.1:3000 in your web browser and view your job. It isn't very interesting yet, because it only has one op.
Click on the "Launchpad" tab and you'll see the view below.
The large upper left pane is empty here, but, in jobs with parameters, this is where you'll be able to edit job configuration on the fly.
Click the "Launch Run" button on the bottom right to execute this job directly from Dagit. A new window should open, and you'll see a much more structured view of the stream of Dagster events start to appear in the left-hand pane.
If you have pop-up blocking enabled, you may need to tell your browser to allow pop-ups from 127.0.0.1—or, just navigate to the "Runs" tab to see this, and every run of your job.
In this view, you can filter and search through the logs corresponding to your job run.
From the directory in which you've saved the job file, just run:
dagster job execute -f hello_cereal.py
You'll see the full stream of events emitted by Dagster appear in the console, including our call to the logging machinery, which will look like:
2021-02-05 08:50:25 - dagster - INFO - system - ce5d4576-2569-44ff-a14a-51010eea5329 - hello_cereal - Found 77 cereals
Success!
If you'd rather execute your jobs as a script, you can do that without using the Dagster CLI at all. Just add a few lines to hello_cereal.py
if __name__ == "__main__":
result = hello_cereal_job.execute_in_process()
Now you can just run:
python hello_cereal.py