From 1e21c400306b8749126ef4e44ff8320220d89d0c Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Fri, 1 Nov 2024 11:35:09 -0700 Subject: [PATCH] [dagster-airlift] [api-docs] additional stuff for loader --- .../api/apidocs/libraries/dagster-airlift.rst | 11 +++++++++ .../dagster_airlift/core/airflow_defs_data.py | 23 +++++++++++++++++++ .../core/serialization/serialized_data.py | 13 ++++++++++- 3 files changed, 46 insertions(+), 1 deletion(-) diff --git a/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst b/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst index b093965a0a062..b386b9fe87934 100644 --- a/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst +++ b/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst @@ -24,6 +24,17 @@ Assets & Definitions .. autofunction:: build_defs_from_airflow_instance +Annotations for customizable components: +.. autoclass:: DagSelectorFn +.. autoclass:: DagsterEventTransformerFn + +Objects for retrieving information about the Airflow/Dagster mapping: +.. autoclass:: DagInfo +.. autoclass:: AirflowDefinitionsData + + + + MWAA (dagster_airlift.mwaa) --------------------------- diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py index e3c489f626378..ecb835df01065 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py @@ -3,6 +3,7 @@ from typing import AbstractSet, Mapping, Set from dagster import AssetKey, AssetSpec, Definitions +from dagster._annotations import public from dagster._record import record from dagster_airlift.core.airflow_instance import AirflowInstance @@ -23,11 +24,20 @@ @record class AirflowDefinitionsData: + """A class that holds data about the assets that are mapped to Airflow dags and tasks, and + provides methods for retrieving information about the mappings. + The user should not instantiate this class directly. It is provided when customizing the events + that are generated by the Airflow sensor using the `event_transformer_fn` argument of + :py:func:`build_defs_from_airflow_instance`. + """ + airflow_instance: AirflowInstance mapped_defs: Definitions + @public @property def instance_name(self) -> str: + """The name of the Airflow instance.""" return self.airflow_instance.name @cached_property @@ -38,7 +48,13 @@ def mapping_info(self) -> AirliftMetadataMappingInfo: def all_asset_specs_by_key(self) -> Mapping[AssetKey, AssetSpec]: return {spec.key: spec for spec in self.mapped_defs.get_all_asset_specs()} + @public def task_ids_in_dag(self, dag_id: str) -> Set[str]: + """Returns the task ids within the given dag_id. + + Args: + dag_id (str): The dag id. + """ return self.mapping_info.task_id_map[dag_id] @property @@ -75,5 +91,12 @@ def peered_dag_asset_keys_by_dag_handle(self) -> Mapping[DagHandle, AbstractSet[ asset_keys_per_handle[dag_handle].add(spec.key) return asset_keys_per_handle + @public def asset_keys_in_task(self, dag_id: str, task_id: str) -> AbstractSet[AssetKey]: + """Returns the asset keys that are mapped to the given task. + + Args: + dag_id (str): The dag id. + task_id (str): The task id. + """ return self.mapped_asset_keys_by_task_handle[TaskHandle(dag_id=dag_id, task_id=task_id)] diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/serialized_data.py b/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/serialized_data.py index 57405217c2558..0c3dc612149c1 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/serialized_data.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/serialized_data.py @@ -5,6 +5,7 @@ AssetKey, _check as check, ) +from dagster._annotations import PublicAttr from dagster._record import record from dagster._serdes import whitelist_for_serdes @@ -29,9 +30,19 @@ def downstream_task_ids(self) -> List[str]: @whitelist_for_serdes @record class DagInfo: + """A record containing information about a given airflow dag. + + Users should not instantiate this class directly. It is provided when customizing which DAGs are included + in the generated definitions using the `dag_selector_fn` argument of :py:func:`build_defs_from_airflow_instance`. + + Attributes: + metadata (Dict[str, Any]): The metadata associated with the dag, retrieved by the Airflow REST API: + https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dags + """ + webserver_url: str dag_id: str - metadata: Dict[str, Any] + metadata: PublicAttr[Dict[str, Any]] @property def url(self) -> str: