From b84bce9cb6b8c7ec804fe49cce9337e556bb76ea Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Thu, 21 Nov 2024 19:52:19 -0800 Subject: [PATCH] [dagster-airlift] Python operator docs --- docs/content/_navigation.json | 8 ++ .../bash-operator-general.mdx | 77 +++++++++++++++++++ .../airlift/operator-migration/overview.mdx | 4 + .../operator-migration/python-operator.mdx | 75 ++++++++++++++++++ .../bash_operator_general.py | 6 ++ .../operator_migration/pyop_asset_shared.py | 9 +++ .../pyop_multi_asset_complete.py | 27 +++++++ .../operator_migration/python_operator.py | 27 +++++++ .../airlift/operator_migration/shared.py | 2 + .../using_pipes_subprocess.py | 10 +++ 10 files changed, 245 insertions(+) create mode 100644 docs/content/integrations/airlift/operator-migration/bash-operator-general.mdx create mode 100644 docs/content/integrations/airlift/operator-migration/python-operator.mdx create mode 100644 examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/bash_operator_general.py create mode 100644 examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_asset_shared.py create mode 100644 examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_multi_asset_complete.py create mode 100644 examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/python_operator.py create mode 100644 examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/shared.py create mode 100644 examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_pipes_subprocess.py diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json index ecee42cc452d9..685f1e6c98689 100644 --- a/docs/content/_navigation.json +++ b/docs/content/_navigation.json @@ -983,6 +983,14 @@ { "title": "Migrating a KubernetesPodOperator", "path": "/integrations/airlift/operator-migration/kubernetes-pod-operator" + }, + { + "title": "Migrating a PythonOperator", + "path": "/integrations/airlift/operator-migration/python-operator" + }, + { + "title": "Migrating a BashOperator (general)", + "path": "/integrations/airlift/operator-migration/bash-operator-general" } ] } diff --git a/docs/content/integrations/airlift/operator-migration/bash-operator-general.mdx b/docs/content/integrations/airlift/operator-migration/bash-operator-general.mdx new file mode 100644 index 0000000000000..7576550d47b0f --- /dev/null +++ b/docs/content/integrations/airlift/operator-migration/bash-operator-general.mdx @@ -0,0 +1,77 @@ +# Operator migration guides: Migrating generalized usage of `BashOperator` + +In this page, we'll explain migrating an Airflow `BashOperator` to Dagster. + + + If using the `BashOperator` to execute dbt commands, see the [dbt migration + guide](/integrations/airlift/operator-migration/bash-operator-dbt). + + +### Background + +The Airflow `BashOperator` is a common operator used to execute bash commands as part of a data pipeline. + +```python file=/integrations/airlift/operator_migration/bash_operator_general.py +from airflow.operators.bash import BashOperator + +execute_script = BashOperator( + task_id="execute_script", + bash_command="python /path/to/script.py", +) +``` + +The `BashOperator`'s functionality is very general since it can be used to run any bash command, and there exist richer integrations in Dagster for many common BashOperator use cases. We'll explain how 1-1 migration of the BashOperator to execute a bash command in Dagster, and how to use the `dagster-airlift` library to proxy the execution of the original task to Dagster. We'll also provide a reference for richer integrations in Dagster for common BashOperator use cases. + +### Dagster equivalent + +The direct Dagster equivalent to the `BashOperator` is to use the to execute a bash command in a subprocess. + +### Migrating the operator + +Migrating the operator breaks down into a few steps: + +1. Ensure that the resources necessary for your bash command are available to both your Airflow and Dagster deployments. +2. Write an that executes the bash command using the . +3. Use `dagster-airlift` to proxy execution of the original task to Dagster. +4. \[Optional] Implement a richer integration for common BashOperator use cases. + +### Step 1: Ensure shared bash command access + +First, you'll need to ensure that the bash command you're running is available for use in both your Airflow and Dagster deployments. What this entails will vary depending on the command you're running. For example, if you're running a python script, it's as simple as ensuring the python script exists in a shared location accessible to both Airflow and Dagster, and all necessary env vars are set in both environments. + +### Step 2: Writing an `@asset`-decorated function + +You can write a Dagster -decorated function that runs your bash command. This is quite straightforward using the . + +```python file=/integrations/airlift/operator_migration/using_pipes_subprocess.py +from dagster import AssetExecutionContext, PipesSubprocessClient, asset + + +@asset +def script_result(context: AssetExecutionContext): + return ( + PipesSubprocessClient() + .run(context=context, command="python /path/to/script.py") + .get_results() + ) +``` + +### Step 3: Using `dagster-airlift` to proxy execution + +Finally, you can use `dagster-airlift` to proxy the execution of the original task to Dagster. The [dagster-airlift migration guide](/integrations/airlift/tutorial/overview) details this process. + +### Step 4: Implementing richer integrations + +For many of the use cases that you might be using the BashOperator for, Dagster might have better options. We'll detail some of those here. + +#### Running a python script + +As mentioned above, you can use the to run a python script in a subprocess. But you can also modify this script to send additional information and logging back to Dagster. See the [Dagster Pipes tutorial](/concepts/dagster-pipes/subprocess) for more information. + +#### Running a dbt command + +We have a whole guide for switching from the `BashOperator` to the `dbt` integration in Dagster. See the [dbt migration guide](/integrations/airlift/operator-migration/bash-operator-dbt) for more information. + +#### Running S3 Sync or other AWS CLI commands + +Dagster has a rich set of integrations for AWS services. For example, you can use the to interact with S3 directly. diff --git a/docs/content/integrations/airlift/operator-migration/overview.mdx b/docs/content/integrations/airlift/operator-migration/overview.mdx index dc3fde15f8998..2829d757d3e4f 100644 --- a/docs/content/integrations/airlift/operator-migration/overview.mdx +++ b/docs/content/integrations/airlift/operator-migration/overview.mdx @@ -17,4 +17,8 @@ This page contains a collection of reference materials for migrating usage of co title="Python Operator" href="/integrations/airlift/operator-migration/python-operator" > + diff --git a/docs/content/integrations/airlift/operator-migration/python-operator.mdx b/docs/content/integrations/airlift/operator-migration/python-operator.mdx new file mode 100644 index 0000000000000..c4582184ad501 --- /dev/null +++ b/docs/content/integrations/airlift/operator-migration/python-operator.mdx @@ -0,0 +1,75 @@ +# Migrating the PythonOperator to Dagster + +In this page, we'll explain migrating an Airflow `PythonOperator` to Dagster. + +### Background + +In Airflow, the `PythonOperator` runs arbitrary python functions. For example, you might have a task that runs function `write_to_db`, which combs a directory for files, and writes each one to a db table. + +```python file=/integrations/airlift/operator_migration/python_operator.py startafter=start_op endbefore=end_op +from airflow.operators.python import PythonOperator + + +def write_to_db() -> None: + for raw_file in RAW_DATA_DIR.iterdir(): + df = contents_as_df(raw_file) + upload_to_db(df) + + +PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...) +``` + +### Dagster equivalent + +The Dagster equivalent is instead to construct a or -decorated function, which materializes assets corresponding to what your python function is doing. + +```python file=/integrations/airlift/operator_migration/pyop_multi_asset_complete.py startafter=start_asset endbefore=end_asset +from dagster import asset + + +@asset(key=TABLE_URI) +def write_to_db() -> None: + for raw_file in RAW_DATA_DIR.iterdir(): + df = contents_as_df(raw_file) + upload_to_db(df) +``` + +### Migrating the operator + +Migrating the operator breaks down into a few steps: + +1. Make a shared library available to both Airflow and Dagster with your python function. +2. Writing an `@asset`-decorated function which runs the python function shared between both modules. +3. Using `dagster-airlift` to proxy execution of the original task to Dagster. + +### Step 1: Building a shared library + +We recommend a monorepo setup for migration; this allows you to keep all your code in one place and easily share code between Airflow and Dagster, without complex CI/CD coordination. + +First, we recommend factoring out a shared package to be available to both the Dagster runtime and the Airflow runtime which contains your python function. The process is as follows: + +1. Scaffold out a new python project which will contain your shared infrastructure. +2. Ensure that the shared library is available to both your Airflow and Dagster deployments. This can be done by adding an editable requirement to your `setup.py` or `pyproject.toml` file in your Airflow/Dagster package. +3. Include the python dependencies relevant to your particular function in your new package. Write your python function in the shared package, and change your Airflow code to import the function from the shared library. + +The reason we recommend using a separate `shared` package is to help ensure that there aren't dependency conflicts between Airflow and Dagster as you migrate. Airflow has very complex dependency management, and migrating to Dagster gives you an opportunity to clean up and isolate your dependencies. You can do this with a series of shared packages in the monorepo, which will eventually be isolated code locations in Dagster. + +### Step 2: Writing an `@asset`-decorated function + +Next, you can write a Dagster or -decorated function that runs your python function. This will generally be pretty straightforward for a `PythonOperator` migration, as you can generally just invoke the shared function into the `asset` function. + +```python file=/integrations/airlift/operator_migration/pyop_asset_shared.py +# This would be the python code living in a shared module. +from dagster import asset + +from .shared import my_shared_python_callable + + +@asset +def my_shared_asset(): + return my_shared_python_callable() +``` + +### Step 3: Using `dagster-airlift` to proxy execution + +Finally, you can use `dagster-airlift` to proxy the execution of the original task to Dagster. The [dagster-airlift migration guide](/integrations/airlift/tutorial/overview) details this process. diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/bash_operator_general.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/bash_operator_general.py new file mode 100644 index 0000000000000..1a45e386c8567 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/bash_operator_general.py @@ -0,0 +1,6 @@ +from airflow.operators.bash import BashOperator + +execute_script = BashOperator( + task_id="execute_script", + bash_command="python /path/to/script.py", +) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_asset_shared.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_asset_shared.py new file mode 100644 index 0000000000000..0e5574401ede4 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_asset_shared.py @@ -0,0 +1,9 @@ +# This would be the python code living in a shared module. +from dagster import asset + +from .shared import my_shared_python_callable + + +@asset +def my_shared_asset(): + return my_shared_python_callable() diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_multi_asset_complete.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_multi_asset_complete.py new file mode 100644 index 0000000000000..91a1768e2dd13 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_multi_asset_complete.py @@ -0,0 +1,27 @@ +from pathlib import Path +from typing import Any + +RAW_DATA_DIR = Path("path") +TABLE_URI = "blah" + + +def contents_as_df(path: Path) -> Any: + pass + + +def upload_to_db(df): + pass + + +# start_asset +from dagster import asset + + +@asset(key=TABLE_URI) +def write_to_db() -> None: + for raw_file in RAW_DATA_DIR.iterdir(): + df = contents_as_df(raw_file) + upload_to_db(df) + + +# end_asset diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/python_operator.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/python_operator.py new file mode 100644 index 0000000000000..f5115eb2f4515 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/python_operator.py @@ -0,0 +1,27 @@ +from pathlib import Path +from typing import Any + +RAW_DATA_DIR = Path("path") + + +def contents_as_df(path: Path) -> Any: + pass + + +def upload_to_db(df: Any): + pass + + +# start_op +from airflow.operators.python import PythonOperator + + +def write_to_db() -> None: + for raw_file in RAW_DATA_DIR.iterdir(): + df = contents_as_df(raw_file) + upload_to_db(df) + + +PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...) + +# end_op diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/shared.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/shared.py new file mode 100644 index 0000000000000..83089bcd9c742 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/shared.py @@ -0,0 +1,2 @@ +def my_shared_python_callable(): + pass diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_pipes_subprocess.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_pipes_subprocess.py new file mode 100644 index 0000000000000..29418f3aa6b38 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_pipes_subprocess.py @@ -0,0 +1,10 @@ +from dagster import AssetExecutionContext, PipesSubprocessClient, asset + + +@asset +def script_result(context: AssetExecutionContext): + return ( + PipesSubprocessClient() + .run(context=context, command="python /path/to/script.py") + .get_results() + )