diff --git a/docs/content/api/modules.json.gz b/docs/content/api/modules.json.gz index 319f452621996..4bba47792ed20 100644 Binary files a/docs/content/api/modules.json.gz and b/docs/content/api/modules.json.gz differ diff --git a/docs/content/api/searchindex.json.gz b/docs/content/api/searchindex.json.gz index f310199a8e33a..b9fb041069083 100644 Binary files a/docs/content/api/searchindex.json.gz and b/docs/content/api/searchindex.json.gz differ diff --git a/docs/content/api/sections.json.gz b/docs/content/api/sections.json.gz index 67841339ac077..8381094b7baf3 100644 Binary files a/docs/content/api/sections.json.gz and b/docs/content/api/sections.json.gz differ diff --git a/docs/next/public/objects.inv b/docs/next/public/objects.inv index e47af63c8760b..83eb0b74981bf 100644 Binary files a/docs/next/public/objects.inv and b/docs/next/public/objects.inv differ diff --git a/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst b/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst index 575922a624666..09370193177c3 100644 --- a/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst +++ b/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst @@ -17,12 +17,20 @@ AirflowInstance .. autoclass:: AirflowBasicAuthBackend - Assets & Definitions ^^^^^^^^^^^^^^^^^^^^ .. autofunction:: build_defs_from_airflow_instance +Mapping Dagster assets to Airflow tasks/dags: +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. autofunction:: assets_with_task_mappings + +.. autofunction:: assets_with_dag_mappings + +.. autofunction:: targeted_by_multiple_tasks + Annotations for customizable components: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -30,6 +38,8 @@ Annotations for customizable components: .. autoclass:: DagsterEventTransformerFn +.. autoclass:: TaskHandleDict + Objects for retrieving information about the Airflow/Dagster mapping: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -37,8 +47,7 @@ Objects for retrieving information about the Airflow/Dagster mapping: .. autoclass:: AirflowDefinitionsData - - +.. currentmodule:: dagster_airlift.mwaa MWAA (dagster_airlift.mwaa) --------------------------- @@ -47,3 +56,5 @@ MWAA (dagster_airlift.mwaa) .. autoclass:: MwaaSessionAuthBackend + + diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py b/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py index bc50d3885b397..4356e95a9a150 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py @@ -11,6 +11,10 @@ build_airflow_mapped_defs as build_airflow_mapped_defs, build_defs_from_airflow_instance as build_defs_from_airflow_instance, ) +from .multiple_tasks import ( + TaskHandleDict as TaskHandleDict, + targeted_by_multiple_tasks as targeted_by_multiple_tasks, +) from .sensor.event_translation import ( AssetEvent as AssetEvent, DagsterEventTransformerFn as DagsterEventTransformerFn, diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py index ffc6cf2464ddd..c1a395ae06cfb 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py @@ -113,7 +113,6 @@ def build_defs_from_airflow_instance( Definitions: A :py:class:`dagster.Definitions` object containing the assets and sensor. Examples: - Building a :py:class:`dagster.Definitions` object from an Airflow instance. .. code-block:: python @@ -214,7 +213,7 @@ def only_include_dag(dag_info: DagInfo) -> bool: airflow_instance=airflow_instance, # same as above dag_selector_fn=only_include_dag, ) - + """ mapped_defs = build_airflow_mapped_defs( airflow_instance=airflow_instance, defs=defs, dag_selector_fn=dag_selector_fn diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/multiple_tasks.py b/examples/experimental/dagster-airlift/dagster_airlift/core/multiple_tasks.py index e68eecff320b8..9971c3b597116 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/multiple_tasks.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/multiple_tasks.py @@ -24,31 +24,32 @@ def targeted_by_multiple_tasks( .. code-block:: python - from dagster import Definitions, AssetSpec, asset - from dagster_airlift import build_defs_from_airflow_instance, dag_defs, task_defs, targeted_by_multiple_tasks - - @asset - def scheduled_twice(): ... - - defs = build_defs_from_airflow_instance( - airflow_instance=airflow_instance - defs=Definitions.merge( - dag_defs( - "other_dag", - task_defs( - "task1", - Definitions(assets=[other_asset]), + from dagster import Definitions, AssetSpec, asset + from dagster_airlift import build_defs_from_airflow_instance, dag_defs, task_defs, targeted_by_multiple_tasks + + @asset + def scheduled_twice(): ... + + defs = build_defs_from_airflow_instance( + airflow_instance=airflow_instance, + defs=Definitions.merge( + dag_defs( + "other_dag", + task_defs( + "task1", + Definitions(assets=[other_asset]), + ), ), + targeted_by_multiple_tasks( + Definitions([scheduled_twice]), + task_handles=[ + {"dag_id": "weekly_dag", "task_id": "task1"}, + {"dag_id": "daily_dag", "task_id": "task1"}, + ], + ) ), - targeted_by_multiple_tasks( - Definitions([scheduled_twice]), - task_handles=[ - {"dag_id": "weekly_dag", "task_id": "task1"}, - {"dag_id": "daily_dag", "task_id": "task1"}, - ], - ) - ), - ) + ) + """ return replace_assets_in_defs( defs, diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/top_level_dag_def_api.py b/examples/experimental/dagster-airlift/dagster_airlift/core/top_level_dag_def_api.py index edf645b96857f..e48604b7c199c 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/top_level_dag_def_api.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/top_level_dag_def_api.py @@ -78,22 +78,24 @@ def assets_with_task_mappings( this information. It is a list of dictionaries with keys "dag_id" and "task_id". Example: - .. code-block:: python - from dagster import AssetSpec, Definitions, asset - from dagster_airlift.core import assets_with_task_mappings - @asset - def asset_one() -> None: ... + .. code-block:: python - defs = Definitions( - assets=assets_with_task_mappings( - dag_id="dag_one", - task_mappings={ - "task_one": [asset_one], - "task_two": [AssetSpec(key="asset_two"), AssetSpec(key="asset_three")], - }, + from dagster import AssetSpec, Definitions, asset + from dagster_airlift.core import assets_with_task_mappings + + @asset + def asset_one() -> None: ... + + defs = Definitions( + assets=assets_with_task_mappings( + dag_id="dag_one", + task_mappings={ + "task_one": [asset_one], + "task_two": [AssetSpec(key="asset_two"), AssetSpec(key="asset_three")], + }, + ) ) - ) """ assets_list = [] for task_id, assets in task_mappings.items(): @@ -124,7 +126,9 @@ def assets_with_dag_mappings( this information. It is a list of strings, where each string is a dag_id which the asset is associated with. Example: + .. code-block:: python + from dagster import AssetSpec, Definitions, asset from dagster_airlift.core import assets_with_dag_mappings