Skip to content

Commit

Permalink
[dagster-airlift] [api-docs] loader
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 7, 2024
1 parent 5173d32 commit e504c7a
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 2 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
10 changes: 8 additions & 2 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ Airlift (dagster-airlift)
=========================


.. currentmodule:: dagster_airlift.core

Core (dagster_airlift.core)
---------------------------

.. currentmodule:: dagster_airlift.core

AirflowInstance
^^^^^^^^^^^^^^^^^

Expand All @@ -16,10 +17,15 @@ AirflowInstance

.. autoclass:: AirflowBasicAuthBackend

.. currentmodule:: dagster_airlift.mwaa

Assets & Definitions
^^^^^^^^^^^^^^^^^^^^

.. autofunction:: build_defs_from_airflow_instance

MWAA (dagster_airlift.mwaa)
---------------------------
.. currentmodule:: dagster_airlift.mwaa

.. autoclass:: MwaaSessionAuthBackend

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from dagster_airlift.core.serialization.serialized_data import DagInfo as DagInfo

from .airflow_defs_data import AirflowDefinitionsData as AirflowDefinitionsData
from .basic_auth import (
AirflowAuthBackend as AirflowAuthBackend,
AirflowBasicAuthBackend as AirflowBasicAuthBackend,
)
from .load_defs import (
AirflowInstance as AirflowInstance,
DagSelectorFn as DagSelectorFn,
build_airflow_mapped_defs as build_airflow_mapped_defs,
build_defs_from_airflow_instance as build_defs_from_airflow_instance,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,140 @@ def build_defs_from_airflow_instance(
event_transformer_fn: DagsterEventTransformerFn = default_event_transformer,
dag_selector_fn: Optional[DagSelectorFn] = None,
) -> Definitions:
"""Builds a :py:class:`dagster.Definitions` object from an Airflow instance.
For every DAG in the Airflow instance, this function will create a Dagster asset for the DAG
with an asset key instance_name/dag/dag_id. It will also create a sensor that polls the Airflow
instance for DAG runs and emits Dagster events for each successful run.
An optional `defs` argument can be provided, where the user can pass in a :py:class:`dagster.Definitions`
object containing assets which are mapped to Airflow DAGs and tasks. These assets will be enriched with
metadata from the Airflow instance, and placed upstream of the automatically generated DAG assets.
An optional `event_transformer_fn` can be provided, which allows the user to modify the Dagster events
produced by the sensor. The function takes the Dagster events produced by the sensor and returns a sequence
of Dagster events.
An optional `dag_selector_fn` can be provided, which allows the user to filter which DAGs assets are created for.
The function takes a :py:class:`dagster_airlift.core.serialization.serialized_data.DagInfo` object and returns a
boolean indicating whether the DAG should be included.
Args:
airflow_instance (AirflowInstance): The Airflow instance to build assets and the sensor from.
defs: Optional[Definitions]: A :py:class:`dagster.Definitions` object containing assets that are
mapped to Airflow DAGs and tasks.
sensor_minimum_interval_seconds (int): The minimum interval in seconds between sensor runs.
event_transformer_fn (DagsterEventTransformerFn): A function that allows for modifying the Dagster events
produced by the sensor.
dag_selector_fn (Optional[DagSelectorFn]): A function that allows for filtering which DAGs assets are created for.
Returns:
Definitions: A :py:class:`dagster.Definitions` object containing the assets and sensor.
Examples:
Building a :py:class:`dagster.Definitions` object from an Airflow instance.
.. code-block:: python
from dagster_airlift.core import (
AirflowInstance,
BasicAirflowAuthBackend,
build_defs_from_airflow_instance,
)
from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME
airflow_instance = AirflowInstance(
auth_backend=BasicAirflowAuthBackend(
webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
),
name=AIRFLOW_INSTANCE_NAME,
)
defs = build_defs_from_airflow_instance(airflow_instance=airflow_instance)
Providing task-mapped assets to the function.
.. code-block:: python
from dagster import Definitions
from dagster_airlift.core import (
AirflowInstance,
BasicAirflowAuthBackend,
assets_with_task_mappings,
build_defs_from_airflow_instance,
)
...
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance, # same as above
defs=Definitions(
assets=assets_with_task_mappings(
dag_id="rebuild_iris_models",
task_mappings={
"my_task": [AssetSpec("my_first_asset"), AssetSpec("my_second_asset")],
},
),
),
)
Providing a custom event transformer function.
.. code-block:: python
from typing import Sequence
from dagster import Definitions, SensorEvaluationContext
from dagster_airlift.core import (
AirflowInstance,
BasicAirflowAuthBackend,
AssetEvent,
assets_with_task_mappings,
build_defs_from_airflow_instance,
AirflowDefinitionsData,
)
...
def add_tags_to_events(
context: SensorEvaluationContext,
defs_data: AirflowDefinitionsData,
events: Sequence[AssetEvent]
) -> Sequence[AssetEvent]:
altered_events = []
for event in events:
altered_events.append(event._replace(tags={"my_tag": "my_value"}))
return altered_events
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance, # same as above
event_transformer_fn=add_tags_to_events,
)
Filtering which DAGs assets are created for.
.. code-block:: python
from dagster import Definitions
from dagster_airlift.core import (
AirflowInstance,
BasicAirflowAuthBackend,
AssetEvent,
assets_with_task_mappings,
build_defs_from_airflow_instance,
DagInfo,
)
...
def only_include_dag(dag_info: DagInfo) -> bool:
return dag_info.dag_id == "my_dag_id"
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance, # same as above
dag_selector_fn=only_include_dag,
)
"""
mapped_defs = build_airflow_mapped_defs(
airflow_instance=airflow_instance, defs=defs, dag_selector_fn=dag_selector_fn
)
Expand Down

0 comments on commit e504c7a

Please sign in to comment.