Skip to content

Commit

Permalink
[dagster-airlift] [api-docs] mapping functions
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 6, 2024
1 parent 4b609db commit d66d4d1
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 44 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
17 changes: 14 additions & 3 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,37 @@ AirflowInstance

.. autoclass:: AirflowBasicAuthBackend


Assets & Definitions
^^^^^^^^^^^^^^^^^^^^

.. autofunction:: build_defs_from_airflow_instance

Mapping Dagster assets to Airflow tasks/dags:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autofunction:: assets_with_task_mappings

.. autofunction:: assets_with_dag_mappings

.. autofunction:: targeted_by_multiple_tasks

Annotations for customizable components:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autoclass:: DagSelectorFn

.. autoclass:: DagsterEventTransformerFn

.. autoclass:: TaskHandleDict

Objects for retrieving information about the Airflow/Dagster mapping:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autoclass:: DagInfo

.. autoclass:: AirflowDefinitionsData



.. currentmodule:: dagster_airlift.mwaa

MWAA (dagster_airlift.mwaa)
---------------------------
Expand All @@ -47,3 +56,5 @@ MWAA (dagster_airlift.mwaa)
.. autoclass:: MwaaSessionAuthBackend




Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
from dagster_airlift.core.serialization.serialized_data import DagInfo as DagInfo

from .airflow_defs_data import AirflowDefinitionsData as AirflowDefinitionsData
from .basic_auth import AirflowBasicAuthBackend as AirflowBasicAuthBackend, AirflowAuthBackend as AirflowAuthBackend
from .basic_auth import (
AirflowAuthBackend as AirflowAuthBackend,
AirflowBasicAuthBackend as AirflowBasicAuthBackend,
)
from .load_defs import (
AirflowInstance as AirflowInstance,
DagSelectorFn as DagSelectorFn,
build_airflow_mapped_defs as build_airflow_mapped_defs,
build_defs_from_airflow_instance as build_defs_from_airflow_instance,
)
from .multiple_tasks import (
TaskHandleDict as TaskHandleDict,
targeted_by_multiple_tasks as targeted_by_multiple_tasks,
)
from .sensor.event_translation import (
AssetEvent as AssetEvent,
DagsterEventTransformerFn as DagsterEventTransformerFn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class AirflowBasicAuthBackend(AirflowAuthBackend):
password (str): The password to authenticate with.
Examples:
Creating a :py:class:`AirflowInstance` using this backend.
.. code-block:: python
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ def build_defs_from_airflow_instance(
Definitions: A :py:class:`dagster.Definitions` object containing the assets and sensor.
Examples:
Building a :py:class:`dagster.Definitions` object from an Airflow instance.
.. code-block:: python
Expand Down Expand Up @@ -214,7 +213,7 @@ def only_include_dag(dag_info: DagInfo) -> bool:
airflow_instance=airflow_instance, # same as above
dag_selector_fn=only_include_dag,
)
"""
mapped_defs = build_airflow_mapped_defs(
airflow_instance=airflow_instance, defs=defs, dag_selector_fn=dag_selector_fn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,32 @@ def targeted_by_multiple_tasks(
.. code-block:: python
from dagster import Definitions, AssetSpec, asset
from dagster_airlift import build_defs_from_airflow_instance, dag_defs, task_defs, targeted_by_multiple_tasks
@asset
def scheduled_twice(): ...
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance
defs=Definitions.merge(
dag_defs(
"other_dag",
task_defs(
"task1",
Definitions(assets=[other_asset]),
from dagster import Definitions, AssetSpec, asset
from dagster_airlift import build_defs_from_airflow_instance, dag_defs, task_defs, targeted_by_multiple_tasks
@asset
def scheduled_twice(): ...
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance,
defs=Definitions.merge(
dag_defs(
"other_dag",
task_defs(
"task1",
Definitions(assets=[other_asset]),
),
),
targeted_by_multiple_tasks(
Definitions([scheduled_twice]),
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
],
)
),
targeted_by_multiple_tasks(
Definitions([scheduled_twice]),
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
],
)
),
)
)
"""
return replace_assets_in_defs(
defs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,24 @@ def assets_with_task_mappings(
this information. It is a list of dictionaries with keys "dag_id" and "task_id".
Example:
.. code-block:: python
from dagster import AssetSpec, Definitions, asset
from dagster_airlift.core import assets_with_task_mappings
@asset
def asset_one() -> None: ...
.. code-block:: python
defs = Definitions(
assets=assets_with_task_mappings(
dag_id="dag_one",
task_mappings={
"task_one": [asset_one],
"task_two": [AssetSpec(key="asset_two"), AssetSpec(key="asset_three")],
},
from dagster import AssetSpec, Definitions, asset
from dagster_airlift.core import assets_with_task_mappings
@asset
def asset_one() -> None: ...
defs = Definitions(
assets=assets_with_task_mappings(
dag_id="dag_one",
task_mappings={
"task_one": [asset_one],
"task_two": [AssetSpec(key="asset_two"), AssetSpec(key="asset_three")],
},
)
)
)
"""
assets_list = []
for task_id, assets in task_mappings.items():
Expand Down Expand Up @@ -124,7 +126,9 @@ def assets_with_dag_mappings(
this information. It is a list of strings, where each string is a dag_id which the asset is associated with.
Example:
.. code-block:: python
from dagster import AssetSpec, Definitions, asset
from dagster_airlift.core import assets_with_dag_mappings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class MwaaSessionAuthBackend(AirflowAuthBackend):
env_name (str): The name of the MWAA environment
Examples:
Creating an AirflowInstance pointed at a MWAA environment.
.. code-block:: python
Expand Down

0 comments on commit d66d4d1

Please sign in to comment.