Skip to content

Commit

Permalink
[dagster-airlift] [api-docs] additional stuff for loader
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 7, 2024
1 parent e504c7a commit b838ce3
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 1 deletion.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
17 changes: 17 additions & 0 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@ Assets & Definitions

.. autofunction:: build_defs_from_airflow_instance

Annotations for customizable components:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autoclass:: DagSelectorFn

.. autoclass:: DagsterEventTransformerFn

Objects for retrieving information about the Airflow/Dagster mapping:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autoclass:: DagInfo

.. autoclass:: AirflowDefinitionsData




MWAA (dagster_airlift.mwaa)
---------------------------
.. currentmodule:: dagster_airlift.mwaa
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import AbstractSet, Mapping, Set

from dagster import AssetKey, AssetSpec, Definitions
from dagster._annotations import public
from dagster._record import record

from dagster_airlift.core.airflow_instance import AirflowInstance
Expand All @@ -23,11 +24,20 @@

@record
class AirflowDefinitionsData:
"""A class that holds data about the assets that are mapped to Airflow dags and tasks, and
provides methods for retrieving information about the mappings.
The user should not instantiate this class directly. It is provided when customizing the events
that are generated by the Airflow sensor using the `event_transformer_fn` argument of
:py:func:`build_defs_from_airflow_instance`.
"""

airflow_instance: AirflowInstance
mapped_defs: Definitions

@public
@property
def instance_name(self) -> str:
"""The name of the Airflow instance."""
return self.airflow_instance.name

@cached_property
Expand All @@ -38,7 +48,13 @@ def mapping_info(self) -> AirliftMetadataMappingInfo:
def all_asset_specs_by_key(self) -> Mapping[AssetKey, AssetSpec]:
return {spec.key: spec for spec in self.mapped_defs.get_all_asset_specs()}

@public
def task_ids_in_dag(self, dag_id: str) -> Set[str]:
"""Returns the task ids within the given dag_id.
Args:
dag_id (str): The dag id.
"""
return self.mapping_info.task_id_map[dag_id]

@property
Expand Down Expand Up @@ -75,5 +91,12 @@ def peered_dag_asset_keys_by_dag_handle(self) -> Mapping[DagHandle, AbstractSet[
asset_keys_per_handle[dag_handle].add(spec.key)
return asset_keys_per_handle

@public
def asset_keys_in_task(self, dag_id: str, task_id: str) -> AbstractSet[AssetKey]:
"""Returns the asset keys that are mapped to the given task.
Args:
dag_id (str): The dag id.
task_id (str): The task id.
"""
return self.mapped_asset_keys_by_task_handle[TaskHandle(dag_id=dag_id, task_id=task_id)]
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
AssetKey,
_check as check,
)
from dagster._annotations import PublicAttr
from dagster._record import record
from dagster._serdes import whitelist_for_serdes

Expand All @@ -29,9 +30,19 @@ def downstream_task_ids(self) -> List[str]:
@whitelist_for_serdes
@record
class DagInfo:
"""A record containing information about a given airflow dag.
Users should not instantiate this class directly. It is provided when customizing which DAGs are included
in the generated definitions using the `dag_selector_fn` argument of :py:func:`build_defs_from_airflow_instance`.
Attributes:
metadata (Dict[str, Any]): The metadata associated with the dag, retrieved by the Airflow REST API:
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dags
"""

webserver_url: str
dag_id: str
metadata: Dict[str, Any]
metadata: PublicAttr[Dict[str, Any]]

@property
def url(self) -> str:
Expand Down

0 comments on commit b838ce3

Please sign in to comment.