Skip to content

Commit

Permalink
[dagster-airlift] Airflow 1 content (#26089)
Browse files Browse the repository at this point in the history
## Summary & Motivation
This page adds content targeted at users on Airflow 1. Essentially, the
[in-airflow] submodule should still work with Airflow 1, you just don't
get the nice metadata.
  • Loading branch information
dpeng817 authored Dec 10, 2024
1 parent d5c901b commit ca9dcc6
Show file tree
Hide file tree
Showing 37 changed files with 1,944 additions and 0 deletions.
30 changes: 30 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,10 @@
"title": "Airlift",
"path": "/integrations/airlift",
"children": [
{
"title": "Mapping Airflow Concepts to Dagster",
"path": "/integrations/airlift/airflow-dagster-equivalents"
},
{
"title": "Airflow Migration Tutorial",
"path": "/integrations/airlift/tutorial/overview",
Expand Down Expand Up @@ -985,6 +989,32 @@
{
"title": "DAG-level migration",
"path": "/integrations/airlift/full_dag"
},
{
"title": "Migration with Airflow 1",
"path": "/integrations/airlift/airflow-1-migration"
},
{
"title": "Operator Migration Reference",
"path": "/integrations/airlift/operator-migration/overview",
"children": [
{
"title": "Migrating a BashOperator for dbt",
"path": "/integrations/airlift/operator-migration/bash-operator-dbt"
},
{
"title": "Migrating a KubernetesPodOperator",
"path": "/integrations/airlift/operator-migration/kubernetes-pod-operator"
},
{
"title": "Migrating a PythonOperator",
"path": "/integrations/airlift/operator-migration/python-operator"
},
{
"title": "Migrating a BashOperator (general)",
"path": "/integrations/airlift/operator-migration/bash-operator-general"
}
]
}
]
},
Expand Down
10 changes: 10 additions & 0 deletions docs/content/integrations/airlift.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ In this tutorial, we'll use `dagster-airlift` to observe DAGs from multiple Airf

[Click here to get started](/integrations/airlift/federation-tutorial/overview).

## Airlift Operator Migration Reference

In this reference, we'll explain how to migrate common Airflow operators to Dagster.

[Click here to get started](/integrations/airlift/operator-migration/overview).

## References

<ArticleList>
Expand All @@ -69,4 +75,8 @@ In this tutorial, we'll use `dagster-airlift` to observe DAGs from multiple Airf
title="Additional Airlift Functionality"
href="/integrations/airlift/reference"
></ArticleListItem>
<ArticleListItem
title="Migrating Airflow 1 to Dagster"
href="/integrations/airlift/airflow-1-migration"
></ArticleListItem>
</ArticleList>
278 changes: 278 additions & 0 deletions docs/content/integrations/airlift/airflow-1-migration.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
# Airflow 1.X Migration Tutorial

## Overview

This guide covers using `dagster-airlift` to migrate an Airflow DAG to Dagster on `apache-airflow` below version 2.

Many APIs within the `dagster-airlift` package make use of Airflow's stable REST API, which was added in Airflow 2.0. However, we still enable a migration process for Airflow 1.x users.

This guide will cover the migration process using the same base example as the [tutorial](/integrations/airlift/tutorial/overview).

We recommend following the tutorial in order to understand the concepts and steps involved in the migration process, and then using this guide to apply those steps to an Airflow 1.x environment.

## Prerequisites

Before continuining, you should have

- Basic familiarity with Dagster concepts such as [assets](/concepts/assets/software-defined-assets) and [code locations](/concepts/code-locations).
- Skimmed the [Airlift migration tutorial](/integrations/airlift/tutorial/overview) to understand the general process of migration.

### Setup

<TabGroup>
<TabItem name="if airlift tutorial">
If you previously ran the Airlift tutorial, you can follow along by doing the following:

- clear `tutorial_example/dagster_defs/definitions.py`, and mark all tasks as unproxied in the proxied state YAML file.

</TabItem>
<TabItem name="if not airlift tutorial">

Start by following the [setup](/integrations/airlift/tutorial/setup) step of the migration tutorial, and we'll diverge from there.

With Airflow 1.x, we won't [peer](/integrations/airlift/tutorial/peer) or [observe](/integrations/airlift/tutorial/observe) Airflow DAGs first - we'll immediately skip to the migration step and proxy execution to Dagster. </TabItem> </TabGroup>

### Scaffolding proxied state

To begin proxying tasks in a DAG, first you will need a file to track proxying state. In your Airflow DAG directory, create a `proxied_state` folder, and in it create a yaml file with the same name as your DAG. The included example at `airflow_dags/proxied_state` is used by `make airflow_run`, and can be used as a template for your own proxied state files.

Given our example DAG `rebuild_customers_list` with three tasks, `load_raw_customers`, `run_dbt_model`, and `export_customers`, `proxied_state/rebuild_customers_list.yaml` should look like the following:

```yaml file=../../airlift-migration-tutorial/tutorial_example/airflow_dags/proxied_state/rebuild_customers_list.yaml
tasks:
- id: load_raw_customers
proxied: False
- id: build_dbt_models
proxied: False
- id: export_customers
proxied: False
```
Next, you will need to modify your Airflow DAG to make it aware of the proxied state. This is already done in the example DAG:
```python file=../../airlift-migration-tutorial/tutorial_example/snippets/dags_truncated.py
# Dags file can be found at tutorial_example/airflow_dags/dags.py
from pathlib import Path

from airflow import DAG
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml

dag = DAG("rebuild_customers_list", ...)

...

# Set this to True to begin the proxying process
PROXYING = False

if PROXYING:
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
)
```
Set `PROXYING` to `True` or eliminate the `if` statement.

The DAG will now display its proxied state in the Airflow UI. (There is some latency as Airflow evaluates the Python file periodically.)

<p align="center">

<Image
alt="Migration state rendering in Airflow UI"
src="/images/integrations/airlift/state_in_airflow.png"
width={528}
height={102}
/>

</p>

### Migrating `build_dbt_models`

We'll now create Dagster assets that correspond to each Airflow task. First, since Dagster provides out of the box integration with dbt, we'll use some utilities from `dagster-dbt` to create assets for the `build_dbt_models` task. In our `tutorial_example/dagster_defs/definitions.py` file:

```python file=../../airlift-migration-tutorial/snippets/airflow_1_dbt.py startafter=start_dbt_project_assets endbefore=end_dbt_project_assets
import os
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)
@dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
```

We used the <PyObject object="dbt_assets" module="dagster_dbt" /> decorator to create assets from the dbt project. The `manifest` argument points to the manifest file generated by dbt, and the `project` argument points to the dbt project directory. The <PyObject object="DbtCliResource" module="dagster_dbt" /> resource is a wrapper around the dbt CLI, which we use to execute dbt commands.

Now, we'll mark our `dbt_project_assets` as being mapped from Airflow:

```python file=../../airlift-migration-tutorial/snippets/airflow_1_dbt.py startafter=start_mapping endbefore=end_mapping
from dagster_airlift.core import assets_with_task_mappings
mapped_assets = assets_with_task_mappings(
dag_id="rebuild_customers_list",
task_mappings={
"build_dbt_models":
# load rich set of assets from dbt project
[dbt_project_assets],
},
)
```

The `assets_with_task_mappings` function adds some metadata to each passed-in asset which, over the wire in Airflow, we'll use to determine which assets to execute in Dagster.

We'll provide the mapped assets to a `Definitions` object in our `tutorial_example/dagster_defs/definitions.py` file:

```python file=../../airlift-migration-tutorial/snippets/airflow_1_dbt.py startafter=start_defs endbefore=end_defs
from dagster import Definitions
defs = Definitions(
assets=mapped_assets, resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}
)
```

Note how this differs from the original migration tutorial; we're not using `build_defs_from_airflow_instance`, which relies on the REST API.

Finally, we'll mark the `build_dbt_models` task as proxied in the proxied state YAML file:

```yaml file=../../airlift-migration-tutorial/tutorial_example/snippets/dbt_proxied.yaml
tasks:
- id: load_raw_customers
proxied: False
- id: build_dbt_models
proxied: True
- id: export_customers
proxied: False
```

**Important**: It may take up to 30 seconds for the proxied state in the Airflow UI to reflect this change. You must subsequently reload the definitions in Dagster via the UI or by restarting `dagster dev`.

You can now run the `rebuild_customers_list` DAG in Airflow, and the `build_dbt_models` task will be executed in a Dagster run:

<p align="center">

<Image
alt="dbt build executing in Dagster"
src="/images/integrations/airlift/proxied_dag.png"
width={1314}
height={178}
/>

</p>

### Completed code

Migrating the other tasks should follow the same pattern as in the [migration tutorial](/integrations/airlift/tutorial/migrate#migrating-the-remaining-custom-operators). When you're done, your code should look like this:

```python file=../../airlift-migration-tutorial/snippets/airflow_1_migrated.py
import os
from pathlib import Path
from dagster import (
AssetExecutionContext,
AssetsDefinition,
AssetSpec,
DailyPartitionsDefinition,
Definitions,
multi_asset,
)
from dagster._time import get_current_datetime_midnight
from dagster_airlift.core import assets_with_task_mappings
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
# Code also invoked from Airflow
from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv
from tutorial_example.shared.load_csv_to_duckdb import LoadCsvToDuckDbArgs, load_csv_to_duckdb
PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight())
def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)
def airflow_dags_path() -> Path:
return Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "tutorial_example" / "airflow_dags"
def load_csv_to_duckdb_asset(spec: AssetSpec, args: LoadCsvToDuckDbArgs) -> AssetsDefinition:
@multi_asset(name=f"load_{args.table_name}", specs=[spec])
def _multi_asset() -> None:
load_csv_to_duckdb(args)
return _multi_asset
def export_duckdb_to_csv_defs(spec: AssetSpec, args: ExportDuckDbToCsvArgs) -> AssetsDefinition:
@multi_asset(name=f"export_{args.table_name}", specs=[spec])
def _multi_asset() -> None:
export_duckdb_to_csv(args)
return _multi_asset
@dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=DbtProject(dbt_project_path()),
partitions_def=PARTITIONS_DEF,
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
mapped_assets = assets_with_task_mappings(
dag_id="rebuild_customers_list",
task_mappings={
"load_raw_customers": [
load_csv_to_duckdb_asset(
AssetSpec(key=["raw_data", "raw_customers"], partitions_def=PARTITIONS_DEF),
LoadCsvToDuckDbArgs(
table_name="raw_customers",
csv_path=airflow_dags_path() / "raw_customers.csv",
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
names=["id", "first_name", "last_name"],
duckdb_schema="raw_data",
duckdb_database_name="jaffle_shop",
),
)
],
"build_dbt_models":
# load rich set of assets from dbt project
[dbt_project_assets],
"export_customers": [
export_duckdb_to_csv_defs(
AssetSpec(key="customers_csv", deps=["customers"], partitions_def=PARTITIONS_DEF),
ExportDuckDbToCsvArgs(
table_name="customers",
csv_path=Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv",
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
duckdb_database_name="jaffle_shop",
),
)
],
},
)
defs = Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
)
```

### Conclusion

To recap, we've covered the process of migrating an Airflow 1.x DAG to Dagster using `dagster-airlift`. We've made clearer what functionality works wth Airflow < 2.0, and what does not. We've shown how to create Dagster assets that correspond to Airflow tasks, and how to mark those tasks as proxied in the proxied state YAML file.
Loading

1 comment on commit ca9dcc6

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-keqpaytov-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit ca9dcc6.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.