Skip to content

Commit

Permalink
[dagster-airlift] [api-docs] mapping functions
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 7, 2024
1 parent 6d3bcf7 commit d78ecda
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 77 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
17 changes: 14 additions & 3 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,37 @@ AirflowInstance

.. autoclass:: AirflowBasicAuthBackend


Assets & Definitions
^^^^^^^^^^^^^^^^^^^^

.. autofunction:: build_defs_from_airflow_instance

Mapping Dagster assets to Airflow tasks/dags:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autofunction:: assets_with_task_mappings

.. autofunction:: assets_with_dag_mappings

.. autofunction:: assets_with_multiple_task_mappings

Annotations for customizable components:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autoclass:: DagSelectorFn

.. autoclass:: DagsterEventTransformerFn

.. autoclass:: TaskHandleDict

Objects for retrieving information about the Airflow/Dagster mapping:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autoclass:: DagInfo

.. autoclass:: AirflowDefinitionsData



.. currentmodule:: dagster_airlift.mwaa

MWAA (dagster_airlift.mwaa)
---------------------------
Expand All @@ -47,3 +56,5 @@ MWAA (dagster_airlift.mwaa)
.. autoclass:: MwaaSessionAuthBackend




Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
build_airflow_mapped_defs as build_airflow_mapped_defs,
build_defs_from_airflow_instance as build_defs_from_airflow_instance,
)
from .multiple_tasks import (
TaskHandleDict as TaskHandleDict,
assets_with_multiple_task_mappings as assets_with_multiple_task_mappings,
)
from .sensor.event_translation import (
AssetEvent as AssetEvent,
DagsterEventTransformerFn as DagsterEventTransformerFn,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Callable, Iterable, Iterator, Optional
from typing import Callable, Iterable, Iterator, Optional, Union

from dagster import (
AssetsDefinition,
Expand Down Expand Up @@ -308,7 +308,9 @@ def _apply_airflow_data_to_specs(
yield assets_def.map_asset_specs(get_airflow_data_to_spec_mapper(serialized_data))


def replace_assets_in_defs(defs: Definitions, assets: Iterable[AssetsDefinition]) -> Definitions:
def replace_assets_in_defs(
defs: Definitions, assets: Iterable[Union[AssetSpec, AssetsDefinition]]
) -> Definitions:
return Definitions(
assets=list(assets),
asset_checks=defs.asset_checks,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from typing import List, TypedDict
from typing import List, Sequence, TypedDict, Union, cast

from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.definitions_class import Definitions

from dagster_airlift.constants import TASK_MAPPING_METADATA_KEY
from dagster_airlift.core import (
build_defs_from_airflow_instance as build_defs_from_airflow_instance,
)
from dagster_airlift.core.load_defs import assets_def_of_defs, replace_assets_in_defs
from dagster_airlift.core.load_defs import replace_assets_in_defs
from dagster_airlift.core.top_level_dag_def_api import spec_with_metadata


Expand All @@ -15,47 +17,69 @@ class TaskHandleDict(TypedDict):
task_id: str


def targeted_by_multiple_tasks(
defs: Definitions, task_handles: List[TaskHandleDict]
) -> Definitions:
def assets_with_multiple_task_mappings(
assets: Sequence[Union[AssetSpec, AssetsDefinition]], task_handles: List[TaskHandleDict]
) -> Sequence[Union[AssetSpec, AssetsDefinition]]:
"""Given an asset or assets definition, return a new asset or assets definition with metadata
that indicates that it is targeted by multiple airflow tasks. An example of this would
be a separate weekly and daily dag that contains a task that targets a single asset.
.. code-block:: python
from dagster import Definitions, AssetSpec, asset
from dagster_airlift import build_defs_from_airflow_instance, dag_defs, task_defs, targeted_by_multiple_tasks
@asset
def scheduled_twice(): ...
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance
defs=Definitions.merge(
dag_defs(
"other_dag",
task_defs(
"task1",
Definitions(assets=[other_asset]),
),
from dagster import Definitions, AssetSpec, asset
from dagster_airlift import (
build_defs_from_airflow_instance,
targeted_by_multiple_tasks,
assets_with_task_mappings,
)
# Asset maps to a single task.
@asset
def other_asset(): ...
# Asset maps to a physical entity which is produced by two different airflow tasks.
@asset
def scheduled_twice(): ...
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance,
defs=Definitions(
assets=[
*assets_with_task_mappings(
dag_id="other_dag",
task_mappings={
"task1": [other_asset]
},
),
*assets_with_multiple_task_mappings(
assets=[scheduled_twice],
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
],
),
]
),
targeted_by_multiple_tasks(
Definitions([scheduled_twice]),
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
],
)
),
)
)
"""
return [
asset.map_asset_specs(
lambda spec: spec_with_metadata(spec, {TASK_MAPPING_METADATA_KEY: task_handles})
)
if isinstance(asset, AssetsDefinition)
else spec_with_metadata(asset, {TASK_MAPPING_METADATA_KEY: task_handles})
for asset in assets
]


def targeted_by_multiple_tasks(
defs: Definitions, task_handles: List[TaskHandleDict]
) -> Definitions:
return replace_assets_in_defs(
defs,
[
assets_def.map_asset_specs(
lambda spec: spec_with_metadata(spec, {TASK_MAPPING_METADATA_KEY: task_handles})
)
for assets_def in assets_def_of_defs(defs)
],
assets_with_multiple_task_mappings(
cast(Sequence[Union[AssetSpec, AssetsDefinition]], defs.assets),
task_handles=task_handles,
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,24 @@ def assets_with_task_mappings(
this information. It is a list of dictionaries with keys "dag_id" and "task_id".
Example:
.. code-block:: python
from dagster import AssetSpec, Definitions, asset
from dagster_airlift.core import assets_with_task_mappings
@asset
def asset_one() -> None: ...
.. code-block:: python
defs = Definitions(
assets=assets_with_task_mappings(
dag_id="dag_one",
task_mappings={
"task_one": [asset_one],
"task_two": [AssetSpec(key="asset_two"), AssetSpec(key="asset_three")],
},
from dagster import AssetSpec, Definitions, asset
from dagster_airlift.core import assets_with_task_mappings
@asset
def asset_one() -> None: ...
defs = Definitions(
assets=assets_with_task_mappings(
dag_id="dag_one",
task_mappings={
"task_one": [asset_one],
"task_two": [AssetSpec(key="asset_two"), AssetSpec(key="asset_three")],
},
)
)
)
"""
assets_list = []
for task_id, assets in task_mappings.items():
Expand Down Expand Up @@ -124,7 +126,9 @@ def assets_with_dag_mappings(
this information. It is a list of strings, where each string is a dag_id which the asset is associated with.
Example:
.. code-block:: python
from dagster import AssetSpec, Definitions, asset
from dagster_airlift.core import assets_with_dag_mappings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
task_defs,
)
from dagster_airlift.core.load_defs import build_full_automapped_dags_from_airflow_instance
from dagster_airlift.core.multiple_tasks import targeted_by_multiple_tasks
from dagster_airlift.core.multiple_tasks import assets_with_multiple_task_mappings
from dagster_airlift.core.serialization.compute import (
build_airlift_metadata_mapping_info,
compute_serialized_data,
Expand Down Expand Up @@ -512,11 +512,15 @@ def scheduled_twice() -> None: ...
Definitions(assets=[other_asset]),
),
),
targeted_by_multiple_tasks(
Definitions([scheduled_twice]),
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
Definitions(
assets=[
*assets_with_multiple_task_mappings(
assets=[scheduled_twice],
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
],
)
],
),
),
Expand Down Expand Up @@ -544,12 +548,14 @@ def double_targeted_asset() -> None: ...
Definitions(assets=[single_targeted_asset]),
),
),
targeted_by_multiple_tasks(
Definitions([double_targeted_asset]),
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
],
Definitions(
assets_with_multiple_task_mappings(
assets=[double_targeted_asset],
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
],
)
),
),
)
Expand Down Expand Up @@ -591,12 +597,16 @@ def double_targeted_asset() -> None: ...
}
),
defs=Definitions.merge(
targeted_by_multiple_tasks(
Definitions([double_targeted_asset]),
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
],
Definitions(
assets=[
*assets_with_multiple_task_mappings(
assets=[double_targeted_asset],
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
],
)
]
),
dag_defs(
"weekly_dag",
Expand Down Expand Up @@ -643,12 +653,14 @@ def double_targeted_asset() -> None: ...
}
),
defs=Definitions.merge(
targeted_by_multiple_tasks(
Definitions([double_targeted_asset]),
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
],
Definitions(
assets=assets_with_multiple_task_mappings(
assets=[double_targeted_asset],
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
],
)
),
dag_defs(
"weekly_dag",
Expand Down

0 comments on commit d78ecda

Please sign in to comment.