-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[dagster-airlift] Python operator docs
- Loading branch information
Showing
10 changed files
with
245 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
77 changes: 77 additions & 0 deletions
77
docs/content/integrations/airlift/operator-migration/bash-operator-general.mdx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
# Operator migration guides: Migrating generalized usage of `BashOperator` | ||
|
||
In this page, we'll explain migrating an Airflow `BashOperator` to Dagster. | ||
|
||
<Note> | ||
If using the `BashOperator` to execute dbt commands, see the [dbt migration | ||
guide](/integrations/airlift/operator-migration/bash-operator-dbt). | ||
</Note> | ||
|
||
### Background | ||
|
||
The Airflow `BashOperator` is a common operator used to execute bash commands as part of a data pipeline. | ||
|
||
```python file=/integrations/airlift/operator_migration/bash_operator_general.py | ||
from airflow.operators.bash import BashOperator | ||
|
||
execute_script = BashOperator( | ||
task_id="execute_script", | ||
bash_command="python /path/to/script.py", | ||
) | ||
``` | ||
|
||
The `BashOperator`'s functionality is very general since it can be used to run any bash command, and there exist richer integrations in Dagster for many common BashOperator use cases. We'll explain how 1-1 migration of the BashOperator to execute a bash command in Dagster, and how to use the `dagster-airlift` library to proxy the execution of the original task to Dagster. We'll also provide a reference for richer integrations in Dagster for common BashOperator use cases. | ||
|
||
### Dagster equivalent | ||
|
||
The direct Dagster equivalent to the `BashOperator` is to use the <PyObject object="PipesSubprocessClient" module="dagster"/> to execute a bash command in a subprocess. | ||
|
||
### Migrating the operator | ||
|
||
Migrating the operator breaks down into a few steps: | ||
|
||
1. Ensure that the resources necessary for your bash command are available to both your Airflow and Dagster deployments. | ||
2. Write an <PyObject object="asset" module="dagster"/> that executes the bash command using the <PyObject object="PipesSubprocessClient" module="dagster"/>. | ||
3. Use `dagster-airlift` to proxy execution of the original task to Dagster. | ||
4. \[Optional] Implement a richer integration for common BashOperator use cases. | ||
|
||
### Step 1: Ensure shared bash command access | ||
|
||
First, you'll need to ensure that the bash command you're running is available for use in both your Airflow and Dagster deployments. What this entails will vary depending on the command you're running. For example, if you're running a python script, it's as simple as ensuring the python script exists in a shared location accessible to both Airflow and Dagster, and all necessary env vars are set in both environments. | ||
|
||
### Step 2: Writing an `@asset`-decorated function | ||
|
||
You can write a Dagster <PyObject object="asset" module="dagster"/>-decorated function that runs your bash command. This is quite straightforward using the <PyObject object="PipesSubprocessClient" module="dagster"/>. | ||
|
||
```python file=/integrations/airlift/operator_migration/using_pipes_subprocess.py | ||
from dagster import AssetExecutionContext, PipesSubprocessClient, asset | ||
|
||
|
||
@asset | ||
def script_result(context: AssetExecutionContext): | ||
return ( | ||
PipesSubprocessClient() | ||
.run(context=context, command="python /path/to/script.py") | ||
.get_results() | ||
) | ||
``` | ||
|
||
### Step 3: Using `dagster-airlift` to proxy execution | ||
|
||
Finally, you can use `dagster-airlift` to proxy the execution of the original task to Dagster. The [dagster-airlift migration guide](/integrations/airlift/tutorial/overview) details this process. | ||
|
||
### Step 4: Implementing richer integrations | ||
|
||
For many of the use cases that you might be using the BashOperator for, Dagster might have better options. We'll detail some of those here. | ||
|
||
#### Running a python script | ||
|
||
As mentioned above, you can use the <PyObject object="PipesSubprocessClient" module="dagster"/> to run a python script in a subprocess. But you can also modify this script to send additional information and logging back to Dagster. See the [Dagster Pipes tutorial](/concepts/dagster-pipes/subprocess) for more information. | ||
|
||
#### Running a dbt command | ||
|
||
We have a whole guide for switching from the `BashOperator` to the `dbt` integration in Dagster. See the [dbt migration guide](/integrations/airlift/operator-migration/bash-operator-dbt) for more information. | ||
|
||
#### Running S3 Sync or other AWS CLI commands | ||
|
||
Dagster has a rich set of integrations for AWS services. For example, you can use the <PyObject object="S3Resource" module="dagster_aws.s3"/> to interact with S3 directly. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
75 changes: 75 additions & 0 deletions
75
docs/content/integrations/airlift/operator-migration/python-operator.mdx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
# Migrating the PythonOperator to Dagster | ||
|
||
In this page, we'll explain migrating an Airflow `PythonOperator` to Dagster. | ||
|
||
### Background | ||
|
||
In Airflow, the `PythonOperator` runs arbitrary python functions. For example, you might have a task that runs function `write_to_db`, which combs a directory for files, and writes each one to a db table. | ||
|
||
```python file=/integrations/airlift/operator_migration/python_operator.py startafter=start_op endbefore=end_op | ||
from airflow.operators.python import PythonOperator | ||
|
||
|
||
def write_to_db() -> None: | ||
for raw_file in RAW_DATA_DIR.iterdir(): | ||
df = contents_as_df(raw_file) | ||
upload_to_db(df) | ||
|
||
|
||
PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...) | ||
``` | ||
|
||
### Dagster equivalent | ||
|
||
The Dagster equivalent is instead to construct a <PyObject object="asset" module="dagster"/> or <PyObject object="multi_asset" module="dagster"/>-decorated function, which materializes assets corresponding to what your python function is doing. | ||
|
||
```python file=/integrations/airlift/operator_migration/pyop_multi_asset_complete.py startafter=start_asset endbefore=end_asset | ||
from dagster import asset | ||
|
||
|
||
@asset(key=TABLE_URI) | ||
def write_to_db() -> None: | ||
for raw_file in RAW_DATA_DIR.iterdir(): | ||
df = contents_as_df(raw_file) | ||
upload_to_db(df) | ||
``` | ||
|
||
### Migrating the operator | ||
|
||
Migrating the operator breaks down into a few steps: | ||
|
||
1. Make a shared library available to both Airflow and Dagster with your python function. | ||
2. Writing an `@asset`-decorated function which runs the python function shared between both modules. | ||
3. Using `dagster-airlift` to proxy execution of the original task to Dagster. | ||
|
||
### Step 1: Building a shared library | ||
|
||
We recommend a monorepo setup for migration; this allows you to keep all your code in one place and easily share code between Airflow and Dagster, without complex CI/CD coordination. | ||
|
||
First, we recommend factoring out a shared package to be available to both the Dagster runtime and the Airflow runtime which contains your python function. The process is as follows: | ||
|
||
1. Scaffold out a new python project which will contain your shared infrastructure. | ||
2. Ensure that the shared library is available to both your Airflow and Dagster deployments. This can be done by adding an editable requirement to your `setup.py` or `pyproject.toml` file in your Airflow/Dagster package. | ||
3. Include the python dependencies relevant to your particular function in your new package. Write your python function in the shared package, and change your Airflow code to import the function from the shared library. | ||
|
||
The reason we recommend using a separate `shared` package is to help ensure that there aren't dependency conflicts between Airflow and Dagster as you migrate. Airflow has very complex dependency management, and migrating to Dagster gives you an opportunity to clean up and isolate your dependencies. You can do this with a series of shared packages in the monorepo, which will eventually be isolated code locations in Dagster. | ||
|
||
### Step 2: Writing an `@asset`-decorated function | ||
|
||
Next, you can write a Dagster <PyObject object="asset" module="dagster"/> or <PyObject object="multi_asset" module="dagster"/>-decorated function that runs your python function. This will generally be pretty straightforward for a `PythonOperator` migration, as you can generally just invoke the shared function into the `asset` function. | ||
|
||
```python file=/integrations/airlift/operator_migration/pyop_asset_shared.py | ||
# This would be the python code living in a shared module. | ||
from dagster import asset | ||
|
||
from .shared import my_shared_python_callable | ||
|
||
|
||
@asset | ||
def my_shared_asset(): | ||
return my_shared_python_callable() | ||
``` | ||
|
||
### Step 3: Using `dagster-airlift` to proxy execution | ||
|
||
Finally, you can use `dagster-airlift` to proxy the execution of the original task to Dagster. The [dagster-airlift migration guide](/integrations/airlift/tutorial/overview) details this process. |
6 changes: 6 additions & 0 deletions
6
...s_snippets/docs_snippets/integrations/airlift/operator_migration/bash_operator_general.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from airflow.operators.bash import BashOperator | ||
|
||
execute_script = BashOperator( | ||
task_id="execute_script", | ||
bash_command="python /path/to/script.py", | ||
) |
9 changes: 9 additions & 0 deletions
9
.../docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_asset_shared.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
# This would be the python code living in a shared module. | ||
from dagster import asset | ||
|
||
from .shared import my_shared_python_callable | ||
|
||
|
||
@asset | ||
def my_shared_asset(): | ||
return my_shared_python_callable() |
27 changes: 27 additions & 0 deletions
27
...ippets/docs_snippets/integrations/airlift/operator_migration/pyop_multi_asset_complete.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from pathlib import Path | ||
from typing import Any | ||
|
||
RAW_DATA_DIR = Path("path") | ||
TABLE_URI = "blah" | ||
|
||
|
||
def contents_as_df(path: Path) -> Any: | ||
pass | ||
|
||
|
||
def upload_to_db(df): | ||
pass | ||
|
||
|
||
# start_asset | ||
from dagster import asset | ||
|
||
|
||
@asset(key=TABLE_URI) | ||
def write_to_db() -> None: | ||
for raw_file in RAW_DATA_DIR.iterdir(): | ||
df = contents_as_df(raw_file) | ||
upload_to_db(df) | ||
|
||
|
||
# end_asset |
27 changes: 27 additions & 0 deletions
27
...es/docs_snippets/docs_snippets/integrations/airlift/operator_migration/python_operator.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from pathlib import Path | ||
from typing import Any | ||
|
||
RAW_DATA_DIR = Path("path") | ||
|
||
|
||
def contents_as_df(path: Path) -> Any: | ||
pass | ||
|
||
|
||
def upload_to_db(df: Any): | ||
pass | ||
|
||
|
||
# start_op | ||
from airflow.operators.python import PythonOperator | ||
|
||
|
||
def write_to_db() -> None: | ||
for raw_file in RAW_DATA_DIR.iterdir(): | ||
df = contents_as_df(raw_file) | ||
upload_to_db(df) | ||
|
||
|
||
PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...) | ||
|
||
# end_op |
2 changes: 2 additions & 0 deletions
2
examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/shared.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
def my_shared_python_callable(): | ||
pass |
10 changes: 10 additions & 0 deletions
10
..._snippets/docs_snippets/integrations/airlift/operator_migration/using_pipes_subprocess.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from dagster import AssetExecutionContext, PipesSubprocessClient, asset | ||
|
||
|
||
@asset | ||
def script_result(context: AssetExecutionContext): | ||
return ( | ||
PipesSubprocessClient() | ||
.run(context=context, command="python /path/to/script.py") | ||
.get_results() | ||
) |