diff --git a/docs/content/api/modules.json.gz b/docs/content/api/modules.json.gz index 00e7af92d94f8..df13ca73a217b 100644 Binary files a/docs/content/api/modules.json.gz and b/docs/content/api/modules.json.gz differ diff --git a/docs/content/api/searchindex.json.gz b/docs/content/api/searchindex.json.gz index a966eb20db0cc..2a7d247a32664 100644 Binary files a/docs/content/api/searchindex.json.gz and b/docs/content/api/searchindex.json.gz differ diff --git a/docs/content/api/sections.json.gz b/docs/content/api/sections.json.gz index e85982ee61f9b..4e6c64e2b0b3a 100644 Binary files a/docs/content/api/sections.json.gz and b/docs/content/api/sections.json.gz differ diff --git a/docs/next/public/objects.inv b/docs/next/public/objects.inv index 27c9bb99cc05c..b1e1faccf5d4b 100644 Binary files a/docs/next/public/objects.inv and b/docs/next/public/objects.inv differ diff --git a/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst b/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst index b83d928c97205..1890ee1d7c2d4 100644 --- a/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst +++ b/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst @@ -2,11 +2,12 @@ Airlift (dagster-airlift) ========================= -.. currentmodule:: dagster_airlift.core Core (dagster_airlift.core) --------------------------- +.. currentmodule:: dagster_airlift.core + AirflowInstance ^^^^^^^^^^^^^^^^^ @@ -16,10 +17,15 @@ AirflowInstance .. autoclass:: AirflowBasicAuthBackend -.. currentmodule:: dagster_airlift.mwaa + +Assets & Definitions +^^^^^^^^^^^^^^^^^^^^ + +.. autofunction:: build_defs_from_airflow_instance MWAA (dagster_airlift.mwaa) --------------------------- +.. currentmodule:: dagster_airlift.mwaa .. autoclass:: MwaaSessionAuthBackend diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py b/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py index d934cab718768..6e16ea8454ed2 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/__init__.py @@ -1,7 +1,10 @@ +from dagster_airlift.core.serialization.serialized_data import DagInfo as DagInfo + from .airflow_defs_data import AirflowDefinitionsData as AirflowDefinitionsData from .basic_auth import AirflowBasicAuthBackend as AirflowBasicAuthBackend, AirflowAuthBackend as AirflowAuthBackend from .load_defs import ( AirflowInstance as AirflowInstance, + DagSelectorFn as DagSelectorFn, build_airflow_mapped_defs as build_airflow_mapped_defs, build_defs_from_airflow_instance as build_defs_from_airflow_instance, ) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py index baa0a0ee90809..ffc6cf2464ddd 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py @@ -82,6 +82,140 @@ def build_defs_from_airflow_instance( event_transformer_fn: DagsterEventTransformerFn = default_event_transformer, dag_selector_fn: Optional[DagSelectorFn] = None, ) -> Definitions: + """Builds a :py:class:`dagster.Definitions` object from an Airflow instance. + + For every DAG in the Airflow instance, this function will create a Dagster asset for the DAG + with an asset key instance_name/dag/dag_id. It will also create a sensor that polls the Airflow + instance for DAG runs and emits Dagster events for each successful run. + + An optional `defs` argument can be provided, where the user can pass in a :py:class:`dagster.Definitions` + object containing assets which are mapped to Airflow DAGs and tasks. These assets will be enriched with + metadata from the Airflow instance, and placed upstream of the automatically generated DAG assets. + + An optional `event_transformer_fn` can be provided, which allows the user to modify the Dagster events + produced by the sensor. The function takes the Dagster events produced by the sensor and returns a sequence + of Dagster events. + + An optional `dag_selector_fn` can be provided, which allows the user to filter which DAGs assets are created for. + The function takes a :py:class:`dagster_airlift.core.serialization.serialized_data.DagInfo` object and returns a + boolean indicating whether the DAG should be included. + + Args: + airflow_instance (AirflowInstance): The Airflow instance to build assets and the sensor from. + defs: Optional[Definitions]: A :py:class:`dagster.Definitions` object containing assets that are + mapped to Airflow DAGs and tasks. + sensor_minimum_interval_seconds (int): The minimum interval in seconds between sensor runs. + event_transformer_fn (DagsterEventTransformerFn): A function that allows for modifying the Dagster events + produced by the sensor. + dag_selector_fn (Optional[DagSelectorFn]): A function that allows for filtering which DAGs assets are created for. + + Returns: + Definitions: A :py:class:`dagster.Definitions` object containing the assets and sensor. + + Examples: + + Building a :py:class:`dagster.Definitions` object from an Airflow instance. + + .. code-block:: python + + from dagster_airlift.core import ( + AirflowInstance, + BasicAirflowAuthBackend, + build_defs_from_airflow_instance, + ) + + from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME + + airflow_instance = AirflowInstance( + auth_backend=BasicAirflowAuthBackend( + webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD + ), + name=AIRFLOW_INSTANCE_NAME, + ) + + + defs = build_defs_from_airflow_instance(airflow_instance=airflow_instance) + + Providing task-mapped assets to the function. + + .. code-block:: python + + from dagster import Definitions + from dagster_airlift.core import ( + AirflowInstance, + BasicAirflowAuthBackend, + assets_with_task_mappings, + build_defs_from_airflow_instance, + ) + ... + + + defs = build_defs_from_airflow_instance( + airflow_instance=airflow_instance, # same as above + defs=Definitions( + assets=assets_with_task_mappings( + dag_id="rebuild_iris_models", + task_mappings={ + "my_task": [AssetSpec("my_first_asset"), AssetSpec("my_second_asset")], + }, + ), + ), + ) + + Providing a custom event transformer function. + + .. code-block:: python + + from typing import Sequence + from dagster import Definitions + from dagster_airlift.core import ( + AirflowInstance, + BasicAirflowAuthBackend, + AssetEvent, + assets_with_task_mappings, + build_defs_from_airflow_instance, + ) + ... + + def add_tags_to_events( + context: SensorEvaluationContext, + defs_data: AirflowDefinitionsData, + events: Sequence[AssetEvent] + ) -> Sequence[AssetEvent]: + altered_events = [] + for event in events: + altered_events.append(event._replace(tags={"my_tag": "my_value"})) + return altered_events + + defs = build_defs_from_airflow_instance( + airflow_instance=airflow_instance, # same as above + event_transformer_fn=add_tags_to_events, + ) + + Filtering which DAGs assets are created for. + + .. code-block:: python + + from dagster import Definitions + from dagster_airlift.core import ( + AirflowInstance, + BasicAirflowAuthBackend, + AssetEvent, + assets_with_task_mappings, + build_defs_from_airflow_instance, + DagInfo, + ) + ... + + def only_include_dag(dag_info: DagInfo) -> bool: + return dag_info.dag_id == "my_dag_id" + + defs = build_defs_from_airflow_instance( + airflow_instance=airflow_instance, # same as above + dag_selector_fn=only_include_dag, + ) + + """ mapped_defs = build_airflow_mapped_defs( airflow_instance=airflow_instance, defs=defs, dag_selector_fn=dag_selector_fn )