Skip to content

Commit

Permalink
[dagster-airlift] Python operator docs
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 24, 2024
1 parent a564411 commit 7f90cc5
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 0 deletions.
8 changes: 8 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,14 @@
{
"title": "Migrating a KubernetesPodOperator",
"path": "/integrations/airlift/operator-migration/kubernetes-pod-operator"
},
{
"title": "Migrating a PythonOperator",
"path": "/integrations/airlift/operator-migration/python-operator"
},
{
"title": "Migrating a BashOperator (general)",
"path": "/integrations/airlift/operator-migration/bash-operator-general"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Operator migration guides: Migrating generalized usage of `BashOperator`

In this page, we'll explain migrating an Airflow `BashOperator` to Dagster.

<Note>
If using the `BashOperator` to execute dbt commands, see the [dbt migration
guide](/integrations/airlift/operator-migration/bash-operator-dbt).
</Note>

### Background

The Airflow `BashOperator` is a common operator used to execute bash commands as part of a data pipeline.

```python file=/integrations/airlift/operator_migration/bash_operator_general.py
from airflow.operators.bash import BashOperator

execute_script = BashOperator(
task_id="execute_script",
bash_command="python /path/to/script.py",
)
```

The `BashOperator`'s functionality is very general since it can be used to run any bash command, and there exist richer integrations in Dagster for many common BashOperator use cases. We'll explain how 1-1 migration of the BashOperator to execute a bash command in Dagster, and how to use the `dagster-airlift` library to proxy the execution of the original task to Dagster. We'll also provide a reference for richer integrations in Dagster for common BashOperator use cases.

### Dagster equivalent

The direct Dagster equivalent to the `BashOperator` is to use the <PyObject object="PipesSubprocessClient" module="dagster"/> to execute a bash command in a subprocess.

### Migrating the operator

Migrating the operator breaks down into a few steps:

1. Ensure that the resources necessary for your bash command are available to both your Airflow and Dagster deployments.
2. Write an <PyObject object="asset" module="dagster"/> that executes the bash command using the <PyObject object="PipesSubprocessClient" module="dagster"/>.
3. Use `dagster-airlift` to proxy execution of the original task to Dagster.
4. \[Optional] Implement a richer integration for common BashOperator use cases.

### Step 1: Ensure shared bash command access

First, you'll need to ensure that the bash command you're running is available for use in both your Airflow and Dagster deployments. What this entails will vary depending on the command you're running. For example, if you're running a python script, it's as simple as ensuring the python script exists in a shared location accessible to both Airflow and Dagster, and all necessary env vars are set in both environments.

### Step 2: Writing an `@asset`-decorated function

You can write a Dagster <PyObject object="asset" module="dagster"/>-decorated function that runs your bash command. This is quite straightforward using the <PyObject object="PipesSubprocessClient" module="dagster"/>.

```python file=/integrations/airlift/operator_migration/using_pipes_subprocess.py
from dagster import AssetExecutionContext, PipesSubprocessClient, asset


@asset
def script_result(context: AssetExecutionContext):
return (
PipesSubprocessClient()
.run(
context=context.op_execution_context,
command="python /path/to/script.py",
)
.get_results()
)
```

### Step 3: Using `dagster-airlift` to proxy execution

Finally, you can use `dagster-airlift` to proxy the execution of the original task to Dagster. The [dagster-airlift migration guide](/integrations/airlift/tutorial/overview) details this process.

### Step 4: Implementing richer integrations

For many of the use cases that you might be using the BashOperator for, Dagster might have better options. We'll detail some of those here.

#### Running a python script

As mentioned above, you can use the <PyObject object="PipesSubprocessClient" module="dagster"/> to run a python script in a subprocess. But you can also modify this script to send additional information and logging back to Dagster. See the [Dagster Pipes tutorial](/concepts/dagster-pipes/subprocess) for more information.

#### Running a dbt command

We have a whole guide for switching from the `BashOperator` to the `dbt` integration in Dagster. See the [dbt migration guide](/integrations/airlift/operator-migration/bash-operator-dbt) for more information.

#### Running S3 Sync or other AWS CLI commands

Dagster has a rich set of integrations for AWS services. For example, you can use the <PyObject object="S3Resource" module="dagster_aws.s3"/> to interact with S3 directly.
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@ This page contains a collection of reference materials for migrating usage of co
title="Python Operator"
href="/integrations/airlift/operator-migration/python-operator"
></ArticleListItem>
<ArticleListItem
title="Bash Operator (general)"
href="/integrations/airlift/operator-migration/bash-operator-general"
></ArticleListItem>
</ArticleList>
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Migrating the PythonOperator to Dagster

In this page, we'll explain migrating an Airflow `PythonOperator` to Dagster.

### Background

In Airflow, the `PythonOperator` runs arbitrary python functions. For example, you might have a task that runs function `write_to_db`, which combs a directory for files, and writes each one to a db table.

```python file=/integrations/airlift/operator_migration/python_operator.py startafter=start_op endbefore=end_op
from airflow.operators.python import PythonOperator


def write_to_db() -> None:
for raw_file in RAW_DATA_DIR.iterdir():
df = contents_as_df(raw_file)
upload_to_db(df)


PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...)
```

### Dagster equivalent

The Dagster equivalent is instead to construct a <PyObject object="asset" module="dagster"/> or <PyObject object="multi_asset" module="dagster"/>-decorated function, which materializes assets corresponding to what your python function is doing.

```python file=/integrations/airlift/operator_migration/pyop_multi_asset_complete.py startafter=start_asset endbefore=end_asset
from dagster import asset


@asset(key=TABLE_URI)
def write_to_db() -> None:
for raw_file in RAW_DATA_DIR.iterdir():
df = contents_as_df(raw_file)
upload_to_db(df)
```

### Migrating the operator

Migrating the operator breaks down into a few steps:

1. Make a shared library available to both Airflow and Dagster with your python function.
2. Writing an `@asset`-decorated function which runs the python function shared between both modules.
3. Using `dagster-airlift` to proxy execution of the original task to Dagster.

### Step 1: Building a shared library

We recommend a monorepo setup for migration; this allows you to keep all your code in one place and easily share code between Airflow and Dagster, without complex CI/CD coordination.

First, we recommend factoring out a shared package to be available to both the Dagster runtime and the Airflow runtime which contains your python function. The process is as follows:

1. Scaffold out a new python project which will contain your shared infrastructure.
2. Ensure that the shared library is available to both your Airflow and Dagster deployments. This can be done by adding an editable requirement to your `setup.py` or `pyproject.toml` file in your Airflow/Dagster package.
3. Include the python dependencies relevant to your particular function in your new package. Write your python function in the shared package, and change your Airflow code to import the function from the shared library.

The reason we recommend using a separate `shared` package is to help ensure that there aren't dependency conflicts between Airflow and Dagster as you migrate. Airflow has very complex dependency management, and migrating to Dagster gives you an opportunity to clean up and isolate your dependencies. You can do this with a series of shared packages in the monorepo, which will eventually be isolated code locations in Dagster.

### Step 2: Writing an `@asset`-decorated function

Next, you can write a Dagster <PyObject object="asset" module="dagster"/> or <PyObject object="multi_asset" module="dagster"/>-decorated function that runs your python function. This will generally be pretty straightforward for a `PythonOperator` migration, as you can generally just invoke the shared function into the `asset` function.

```python file=/integrations/airlift/operator_migration/pyop_asset_shared.py
# This would be the python code living in a shared module.
from dagster import asset

from .shared import my_shared_python_callable


@asset
def my_shared_asset():
return my_shared_python_callable()
```

### Step 3: Using `dagster-airlift` to proxy execution

Finally, you can use `dagster-airlift` to proxy the execution of the original task to Dagster. The [dagster-airlift migration guide](/integrations/airlift/tutorial/overview) details this process.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from airflow.operators.bash import BashOperator

execute_script = BashOperator(
task_id="execute_script",
bash_command="python /path/to/script.py",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# This would be the python code living in a shared module.
from dagster import asset

from .shared import my_shared_python_callable


@asset
def my_shared_asset():
return my_shared_python_callable()
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from pathlib import Path
from typing import Any

RAW_DATA_DIR = Path("path")
TABLE_URI = "blah"


def contents_as_df(path: Path) -> Any:
pass


def upload_to_db(df):
pass


# start_asset
from dagster import asset


@asset(key=TABLE_URI)
def write_to_db() -> None:
for raw_file in RAW_DATA_DIR.iterdir():
df = contents_as_df(raw_file)
upload_to_db(df)


# end_asset
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from pathlib import Path
from typing import Any

RAW_DATA_DIR = Path("path")


def contents_as_df(path: Path) -> Any:
pass


def upload_to_db(df: Any):
pass


# start_op
from airflow.operators.python import PythonOperator


def write_to_db() -> None:
for raw_file in RAW_DATA_DIR.iterdir():
df = contents_as_df(raw_file)
upload_to_db(df)


PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...)

# end_op
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def my_shared_python_callable():
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dagster import AssetExecutionContext, PipesSubprocessClient, asset


@asset
def script_result(context: AssetExecutionContext):
return (
PipesSubprocessClient()
.run(context=context, command="python /path/to/script.py")
.get_results()
)

0 comments on commit 7f90cc5

Please sign in to comment.