diff --git a/docs/content/api/modules.json.gz b/docs/content/api/modules.json.gz index df13ca73a217b..319f452621996 100644 Binary files a/docs/content/api/modules.json.gz and b/docs/content/api/modules.json.gz differ diff --git a/docs/content/api/searchindex.json.gz b/docs/content/api/searchindex.json.gz index 2a7d247a32664..f310199a8e33a 100644 Binary files a/docs/content/api/searchindex.json.gz and b/docs/content/api/searchindex.json.gz differ diff --git a/docs/content/api/sections.json.gz b/docs/content/api/sections.json.gz index 4e6c64e2b0b3a..67841339ac077 100644 Binary files a/docs/content/api/sections.json.gz and b/docs/content/api/sections.json.gz differ diff --git a/docs/next/public/objects.inv b/docs/next/public/objects.inv index b1e1faccf5d4b..e47af63c8760b 100644 Binary files a/docs/next/public/objects.inv and b/docs/next/public/objects.inv differ diff --git a/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst b/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst index 1890ee1d7c2d4..575922a624666 100644 --- a/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst +++ b/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst @@ -23,6 +23,23 @@ Assets & Definitions .. autofunction:: build_defs_from_airflow_instance +Annotations for customizable components: +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. autoclass:: DagSelectorFn + +.. autoclass:: DagsterEventTransformerFn + +Objects for retrieving information about the Airflow/Dagster mapping: +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. autoclass:: DagInfo + +.. autoclass:: AirflowDefinitionsData + + + + MWAA (dagster_airlift.mwaa) --------------------------- .. currentmodule:: dagster_airlift.mwaa diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py index e3c489f626378..ecb835df01065 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/airflow_defs_data.py @@ -3,6 +3,7 @@ from typing import AbstractSet, Mapping, Set from dagster import AssetKey, AssetSpec, Definitions +from dagster._annotations import public from dagster._record import record from dagster_airlift.core.airflow_instance import AirflowInstance @@ -23,11 +24,20 @@ @record class AirflowDefinitionsData: + """A class that holds data about the assets that are mapped to Airflow dags and tasks, and + provides methods for retrieving information about the mappings. + The user should not instantiate this class directly. It is provided when customizing the events + that are generated by the Airflow sensor using the `event_transformer_fn` argument of + :py:func:`build_defs_from_airflow_instance`. + """ + airflow_instance: AirflowInstance mapped_defs: Definitions + @public @property def instance_name(self) -> str: + """The name of the Airflow instance.""" return self.airflow_instance.name @cached_property @@ -38,7 +48,13 @@ def mapping_info(self) -> AirliftMetadataMappingInfo: def all_asset_specs_by_key(self) -> Mapping[AssetKey, AssetSpec]: return {spec.key: spec for spec in self.mapped_defs.get_all_asset_specs()} + @public def task_ids_in_dag(self, dag_id: str) -> Set[str]: + """Returns the task ids within the given dag_id. + + Args: + dag_id (str): The dag id. + """ return self.mapping_info.task_id_map[dag_id] @property @@ -75,5 +91,12 @@ def peered_dag_asset_keys_by_dag_handle(self) -> Mapping[DagHandle, AbstractSet[ asset_keys_per_handle[dag_handle].add(spec.key) return asset_keys_per_handle + @public def asset_keys_in_task(self, dag_id: str, task_id: str) -> AbstractSet[AssetKey]: + """Returns the asset keys that are mapped to the given task. + + Args: + dag_id (str): The dag id. + task_id (str): The task id. + """ return self.mapped_asset_keys_by_task_handle[TaskHandle(dag_id=dag_id, task_id=task_id)] diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/serialized_data.py b/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/serialized_data.py index 57405217c2558..0c3dc612149c1 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/serialized_data.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/serialized_data.py @@ -5,6 +5,7 @@ AssetKey, _check as check, ) +from dagster._annotations import PublicAttr from dagster._record import record from dagster._serdes import whitelist_for_serdes @@ -29,9 +30,19 @@ def downstream_task_ids(self) -> List[str]: @whitelist_for_serdes @record class DagInfo: + """A record containing information about a given airflow dag. + + Users should not instantiate this class directly. It is provided when customizing which DAGs are included + in the generated definitions using the `dag_selector_fn` argument of :py:func:`build_defs_from_airflow_instance`. + + Attributes: + metadata (Dict[str, Any]): The metadata associated with the dag, retrieved by the Airflow REST API: + https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dags + """ + webserver_url: str dag_id: str - metadata: Dict[str, Any] + metadata: PublicAttr[Dict[str, Any]] @property def url(self) -> str: