diff --git a/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst b/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst index 0035d8e2d3079..60230ba9300f6 100644 --- a/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst +++ b/docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst @@ -36,13 +36,32 @@ Objects for retrieving information about the Airflow/Dagster mapping: .. autoclass:: DagInfo .. autoclass:: AirflowDefinitionsData -.. currentmodule:: dagster_airlift.mwaa MWAA (dagster_airlift.mwaa) --------------------------- +.. currentmodule:: dagster_airlift.mwaa .. autoclass:: MwaaSessionAuthBackend +In Airflow (dagster_airlift.in_airflow) +--------------------------------------- + +.. currentmodule:: dagster_airlift.in_airflow + +Proxying Function +.. autofunction:: proxying_to_dagster + +Operators + +.. autoclass:: BaseDagsterAssetsOperator + +Task-level proxying +.. autoclass:: BaseProxyTaskToDagsterOperator +.. autoclass:: DefaultProxyTaskToDagsterOperator + +Dag-level proxying +.. autoclass:: BaseProxyDAGToDagsterOperator +.. autoclass:: DefaultProxyDAGToDagsterOperator diff --git a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/__init__.py b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/__init__.py index 94dd6399f204e..e9c39ba660d3b 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/__init__.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/__init__.py @@ -16,6 +16,10 @@ def ensure_airflow_installed() -> None: BaseProxyDAGToDagsterOperator as BaseProxyDAGToDagsterOperator, DefaultProxyDAGToDagsterOperator as DefaultProxyDAGToDagsterOperator, ) +from .proxied_state import ( + AirflowProxiedState as AirflowProxiedState, + load_proxied_state_from_yaml as load_proxied_state_from_yaml, +) from .proxying_fn import proxying_to_dagster as proxying_to_dagster from .task_proxy_operator import ( BaseProxyTaskToDagsterOperator as BaseProxyTaskToDagsterOperator, diff --git a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_asset_operator.py b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_asset_operator.py index ad836716062b5..809191e679b9a 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_asset_operator.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/base_asset_operator.py @@ -29,7 +29,19 @@ class BaseDagsterAssetsOperator(BaseOperator, ABC): - """Interface for an operator which materializes dagster assets.""" + """Interface for an operator which materializes dagster assets. + + This operator needs to implement the following methods: + - get_dagster_session: Returns a requests session that can be used to make requests to the Dagster API. + This is where any additional authentication can be added. + - get_dagster_url: Returns the URL for the Dagster instance. + - filter_asset_nodes: Filters asset nodes (which are returned from Dagster's graphql API) to only include those + that should be triggered by the current task. + + Optionally, these methods can be overridden as well: + - get_partition_key: Determines the partition key to use to trigger the dagster run. This method will only be + called if the underlying asset is partitioned. + """ def __init__( self, diff --git a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/dag_proxy_operator.py b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/dag_proxy_operator.py index deb340d1ddee9..eb0a51bb05d69 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/dag_proxy_operator.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/dag_proxy_operator.py @@ -12,7 +12,23 @@ class BaseProxyDAGToDagsterOperator(BaseDagsterAssetsOperator): - """An operator that proxies task execution to Dagster assets with metadata that map to this task's dag ID and task ID.""" + """An operator base class that proxies the entire DAG's execution to Dagster assets with + metadata that map to the DAG id used by this task. + + For the Dag ID that this operator proxies, it expects there to be corresponding assets + in the linked Dagster deployment that have metadata entries with the key `dagster-airlift/dag-mapping` that + map to this Dag ID. This metadata is typically set using the + :py:func:`dagster_airlift.core.assets_with_dag_mappings` function. + + The following methods must be implemented by subclasses: + - :py:meth:`get_dagster_session` (inherited from :py:class:`BaseDagsterAssetsOperator`) + - :py:meth:`get_dagster_url` (inherited from :py:class:`BaseDagsterAssetsOperator`) + - :py:meth:`build_from_dag` A class method which takes the DAG to be proxied, and constructs + an instance of this operator from it. + + There is a default implementation of this operator, :py:class:`DefaultProxyDAGToDagsterOperator`, + which is used by :py:func:`proxying_to_dagster` if no override operator is provided. + """ def filter_asset_nodes( self, context: Context, asset_nodes: Sequence[Mapping[str, Any]] @@ -24,12 +40,15 @@ def filter_asset_nodes( @classmethod @abstractmethod def build_from_dag(cls, dag: DAG) -> "BaseProxyDAGToDagsterOperator": - """Builds a proxy operator from a DAG.""" + """Builds a proxy operator from the passed-in DAG.""" class DefaultProxyDAGToDagsterOperator(BaseProxyDAGToDagsterOperator): """The default task proxying operator - which opens a blank session and expects the dagster URL to be set in the environment. The dagster url is expected to be set in the environment as DAGSTER_URL. + + This operator should not be instantiated directly - it is instantiated by :py:func:`proxying_to_dagster` if no + override operator is provided. """ def get_dagster_session(self, context: Context) -> requests.Session: diff --git a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/proxying_fn.py b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/proxying_fn.py index 4ce39780599c1..1728ecc229281 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/proxying_fn.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/proxying_fn.py @@ -27,9 +27,16 @@ def proxying_to_dagster( [DAG], BaseProxyDAGToDagsterOperator ] = DefaultProxyDAGToDagsterOperator.build_from_dag, ) -> None: - """Uses passed-in dictionary to alter dags and tasks to proxy to dagster. - Uses a proxied dictionary to determine the proxied status for each task within each dag. - Should only ever be the last line in a dag file. + """Proxies tasks and dags to Dagster based on provided proxied state. + Expects a dictionary of in-scope global variables to be provided (typically retrieved with `globals()`), and a proxied state dictionary + (typically retrieved with :py:func:`load_proxied_state_from_yaml`) for dags in that global state. This function will modify in-place the + dictionary of global variables to replace proxied tasks with appropriate Dagster operators. + + In the case of task-level proxying, the proxied tasks will be replaced with new operators that are constructed by the provided `build_from_task_fn`. + A default implementation of this function is provided in `DefaultProxyTaskToDagsterOperator`. + In the case of dag-level proxying, the entire dag structure will be replaced with a single task that is constructed by the provided `build_from_dag_fn`. + A default implementation of this function is provided in `DefaultProxyDAGToDagsterOperator`. + Args: global_vars (Dict[str, Any]): The global variables in the current context. In most cases, retrieved with `globals()` (no import required). @@ -37,6 +44,92 @@ def proxying_to_dagster( https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#loading-dags proxied_state (AirflowMigrationState): The proxied state for the dags. logger (Optional[logging.Logger]): The logger to use. Defaults to logging.getLogger("dagster_airlift"). + + + Examples: + Typical usage of this function is to be called at the end of a dag file, retrieving proxied_state from an accompanying `proxied_state` path. + .. code-block:: python + from pathlib import Path + + from airflow import DAG + from airflow.operators.python import PythonOperator + from dagster._time import get_current_datetime_midnight + from dagster_airlift.in_airflow import proxying_to_dagster + from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml + + + with DAG( + dag_id="daily_interval_dag", + ..., + ) as minute_dag: + PythonOperator(task_id="my_task", python_callable=...) + + # At the end of the dag file, so we can ensure dags are loaded into globals. + proxying_to_dagster( + proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"), + global_vars=globals(), + ) + + You can also provide custom implementations of the `build_from_task_fn` function to customize the behavior of task-level proxying. + .. code-block:: python + from dagster_airlift.in_airflow import proxying_to_dagster, BaseProxyTaskToDagsterOperator + from airflow.models.operator import BaseOperator + + ... # Dag code here + + class CustomAuthTaskProxyOperator(BaseProxyTaskToDagsterOperator): + def get_dagster_session(self, context: Context) -> requests.Session: + # Add custom headers to the session + return requests.Session(headers={"Authorization": "Bearer my_token"}) + + def get_dagster_url(self, context: Context) -> str: + # Use a custom environment variable for the dagster url + return os.environ["CUSTOM_DAGSTER_URL"] + + @classmethod + def build_from_task(cls, task: BaseOperator) -> "CustomAuthDAGProxyOperator": + # Custom logic to build the operator from the task (task_id should remain the same) + if task.task_id == "my_task_needs_more_retries": + return CustomAuthDAGProxyOperator(task_id=task_id, retries=3) + else: + return CustomAuthDAGProxyOperator(task_id=task_id) + + proxying_to_dagster( + proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"), + global_vars=globals(), + build_from_task_fn=custom_build_from_task_fn, + ) + + You can do the same for dag-level proxying by providing a custom implementation of the `build_from_dag_fn` function. + .. code-block:: python + from dagster_airlift.in_airflow import proxying_to_dagster, BaseProxyDAGToDagsterOperator + from airflow.models.dag import DAG + + ... # Dag code here + + class CustomAuthDAGProxyOperator(BaseProxyDAGToDagsterOperator): + def get_dagster_session(self, context: Context) -> requests.Session: + # Add custom headers to the session + return requests.Session(headers={"Authorization": "Bearer my_token"}) + + def get_dagster_url(self, context: Context) -> str: + # Use a custom environment variable for the dagster url + return os.environ["CUSTOM_DAGSTER_URL"] + + @classmethod + def build_from_dag(cls, dag: DAG) -> "CustomAuthDAGProxyOperator": + # Custom logic to build the operator from the dag (DAG id should remain the same) + if dag.dag_id == "my_dag_needs_more_retries": + return CustomAuthDAGProxyOperator(task_id="custom override", retries=3, dag=dag) + else: + return CustomAuthDAGProxyOperator(task_id="basic_override", dag=dag) + + proxying_to_dagster( + proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"), + global_vars=globals(), + build_from_dag_fn=custom_build_from_dag_fn, + ) + """ caller_module = global_vars.get("__module__") suffix = f" in module `{caller_module}`" if caller_module else "" diff --git a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/task_proxy_operator.py b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/task_proxy_operator.py index a205c0e8755dd..d75d05ed76776 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/task_proxy_operator.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/in_airflow/task_proxy_operator.py @@ -12,7 +12,22 @@ class BaseProxyTaskToDagsterOperator(BaseDagsterAssetsOperator): - """An operator that proxies task execution to Dagster assets with metadata that map to this task's dag ID and task ID.""" + """An operator that proxies task execution to Dagster assets with metadata that map to this task's dag ID and task ID. + + For the DAG ID and task ID that this operator proxies, it expects there to be corresponding assets + in the linked Dagster deployment that have metadata entries with the key `dagster-airlift/task-mapping` that + map to this DAG ID and task ID. This metadata is typically set using the + :py:func:`dagster_airlift.core.assets_with_task_mappings` function. + + The following methods must be implemented by subclasses: + - :py:meth:`get_dagster_session` (inherited from :py:class:`BaseDagsterAssetsOperator`) + - :py:meth:`get_dagster_url` (inherited from :py:class:`BaseDagsterAssetsOperator`) + - :py:meth:`build_from_task` A class method which takes the task to be proxied, and constructs + an instance of this operator from it. + + There is a default implementation of this operator, :py:class:`DefaultProxyTaskToDagsterOperator`, + which is used by :py:func:`proxying_to_dagster` if no override operator is provided. + """ def filter_asset_nodes( self, context: Context, asset_nodes: Sequence[Mapping[str, Any]] @@ -31,6 +46,9 @@ def build_from_task(cls, task: BaseOperator) -> "BaseProxyTaskToDagsterOperator" class DefaultProxyTaskToDagsterOperator(BaseProxyTaskToDagsterOperator): """The default task proxying operator - which opens a blank session and expects the dagster URL to be set in the environment. The dagster url is expected to be set in the environment as DAGSTER_URL. + + This operator should not be instantiated directly - it is instantiated by :py:func:`proxying_to_dagster` if no + override operator is provided. """ def get_dagster_session(self, context: Context) -> requests.Session: