Skip to content

Commit

Permalink
[dagster-airlift] [api-docs] auth backends
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 1, 2024
1 parent d348b09 commit f655af7
Show file tree
Hide file tree
Showing 24 changed files with 138 additions and 44 deletions.
21 changes: 19 additions & 2 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
Airlift (dagster-airlift)
=========================


.. currentmodule:: dagster_airlift.core

Airflow Instance
Core (dagster_airlift.core)
---------------------------

AirflowInstance
^^^^^^^^^^^^^^^^^

.. autoclass:: AirflowInstance
:members:
:members:

.. autoclass:: AirflowAuthBackend

.. autoclass:: BasicAirflowAuthBackend

.. currentmodule:: dagster_airlift.mwaa

MWAA (dagster_airlift.mwaa)
---------------------------

.. autoclass:: MwaaSessionAuthBackend


Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .airflow_defs_data import AirflowDefinitionsData as AirflowDefinitionsData
from .basic_auth import BasicAuthBackend as BasicAuthBackend
from .basic_auth import BasicAirflowAuthBackend as BasicAirflowAuthBackend
from .load_defs import (
AirflowInstance as AirflowInstance,
build_airflow_mapped_defs as build_airflow_mapped_defs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@


class AirflowAuthBackend(ABC):
"""An abstract class that represents an authentication backend for an Airflow instance.
Requires two methods to be implemented by subclasses:
- get_session: Returns a requests.Session object that can be used to make requests to the Airflow instance, and handles authentication.
- get_webserver_url: Returns the base URL of the Airflow webserver.
The `dagster-airlift` package provides the following default implementations:
- :py:class:`dagster-airlift.core.AirflowBasicAuthBackend`: An authentication backend that uses Airflow's basic auth to authenticate with the Airflow instance.
- :py:class:`dagster-airlift.mwaa.MwaaSessionAuthBackend`: An authentication backend that uses AWS MWAA's web login token to authenticate with the Airflow instance (requires `dagster-airlift[mwaa]`).
"""

def get_session(self) -> requests.Session:
raise NotImplementedError("This method must be implemented by subclasses.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,30 @@
from .airflow_instance import AirflowAuthBackend


class BasicAuthBackend(AirflowAuthBackend):
class BasicAirflowAuthBackend(AirflowAuthBackend):
"""A :py:class:`dagster_airlift.core.AirflowAuthBackend` that authenticates using basic auth.
Args:
webserver_url (str): The URL of the webserver.
username (str): The username to authenticate with.
password (str): The password to authenticate with.
Examples:
.. code-block:: python
# Creating an AirflowInstance.
from dagster_airlift.core import AirflowInstance, BasicAuthBackend
af_instance = AirflowInstance(
name="my-instance",
auth_backend=BasicAuthBackend(
webserver_url="https://my-webserver-hostname",
username="my-username",
password="my-password"
)
)
"""

def __init__(self, webserver_url: str, username: str, password: str):
self._webserver_url = webserver_url
self.username = username
Expand Down
33 changes: 30 additions & 3 deletions examples/experimental/dagster-airlift/dagster_airlift/mwaa/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,35 @@ def get_session_info(mwaa: Any, env_name: str) -> Tuple[str, str]:


class MwaaSessionAuthBackend(AirflowAuthBackend):
def __init__(self, mwaa_client: Any, env_name: str) -> None:
self.mwaa_client = mwaa_client
"""A :py:class:`dagster_airlift.core.AirflowAuthBackend` that authenticates to AWS MWAA.
Under the hood, this class uses the MWAA boto3 client to request a web login token and then
uses the token to authenticate to the MWAA web server.
Args:
mwaa_client (Any): The boto3 MWAA client
env_name (str): The name of the MWAA environment
Examples:
.. code-block:: python
# Creating an AirflowInstance pointed at a MWAA environment
import boto3
from dagster_airlift.mwaa import MwaaSessionAuthBackend
from dagster_airlift.core import AirflowInstance
boto_session = boto3.Session(profile_name="my_profile", region_name="us-west-2")
af_instance = AirflowInstance(
name="my-mwaa-instance",
auth_backend=MwaaSessionAuthBackend(
mwaa_session=boto_session,
env_name="my-mwaa-env"
)
)
"""

def __init__(self, mwaa_session: Any, env_name: str) -> None:
self.mwaa_client = mwaa_session
self.env_name = env_name
# Session info is generated when we either try to retrieve a session or retrieve the web server url
self._session_info: Optional[Tuple[str, str]] = None
Expand All @@ -40,7 +67,7 @@ def __init__(self, mwaa_client: Any, env_name: str) -> None:
def from_profile(region: str, env_name: str, profile_name: Optional[str] = None):
boto_session = boto3.Session(profile_name=profile_name, region_name=region)
mwaa = boto_session.client("mwaa")
return MwaaSessionAuthBackend(mwaa_client=mwaa, env_name=env_name)
return MwaaSessionAuthBackend(mwaa_session=mwaa, env_name=env_name)

def get_session(self) -> requests.Session:
# Get the session info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest
from dagster._core.errors import DagsterError
from dagster_airlift.core import AirflowInstance, BasicAuthBackend
from dagster_airlift.core import AirflowInstance, BasicAirflowAuthBackend

from .conftest import assert_link_exists

Expand All @@ -13,7 +13,7 @@ def test_airflow_instance(airflow_instance: None) -> None:
Airflow is loaded with one dag (print_dag) which contains two tasks (print_task, downstream_print_task).
"""
instance = AirflowInstance(
auth_backend=BasicAuthBackend(
auth_backend=BasicAirflowAuthBackend(
webserver_url="http://localhost:8080", username="admin", password="admin"
),
name="airflow_instance",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def test_mwaa_session_auth_direct_mwaa_client_creation() -> None:
mock_get_session_info.return_value = ("my-webserver-hostname", "my-session-cookie")
boto_session = boto3.Session(region_name="us-west-2")
mwaa = boto_session.client("mwaa")
auth_backend = MwaaSessionAuthBackend(mwaa_client=mwaa, env_name="my-env")
auth_backend = MwaaSessionAuthBackend(mwaa_session=mwaa, env_name="my-env")
session = auth_backend.get_session()
assert session.cookies["session"] == "my-session-cookie"
assert auth_backend.get_webserver_url() == "https://my-webserver-hostname"
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dagster._core.definitions.definitions_class import Definitions
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
BasicAirflowAuthBackend,
assets_with_task_mappings,
build_defs_from_airflow_instance,
)
Expand All @@ -13,7 +13,7 @@
from .jaffle_shop import jaffle_shop_assets, jaffle_shop_resource

airflow_instance = AirflowInstance(
auth_backend=BasicAuthBackend(
auth_backend=BasicAirflowAuthBackend(
webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
),
name=AIRFLOW_INSTANCE_NAME,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dagster._core.definitions.definitions_class import Definitions
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
BasicAirflowAuthBackend,
assets_with_task_mappings,
build_defs_from_airflow_instance,
)
Expand All @@ -13,7 +13,7 @@
from .jaffle_shop import jaffle_shop_external_assets, jaffle_shop_resource

airflow_instance = AirflowInstance(
auth_backend=BasicAuthBackend(
auth_backend=BasicAirflowAuthBackend(
webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
),
name=AIRFLOW_INSTANCE_NAME,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dagster import Definitions
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
BasicAirflowAuthBackend,
assets_with_task_mappings,
build_defs_from_airflow_instance,
)
Expand All @@ -13,7 +13,7 @@
from .jaffle_shop import jaffle_shop_external_assets, jaffle_shop_resource

airflow_instance = AirflowInstance(
auth_backend=BasicAuthBackend(
auth_backend=BasicAirflowAuthBackend(
webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
),
name=AIRFLOW_INSTANCE_NAME,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance
from dagster_airlift.core import (
AirflowInstance,
BasicAirflowAuthBackend,
build_defs_from_airflow_instance,
)

from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME

airflow_instance = AirflowInstance(
auth_backend=BasicAuthBackend(
auth_backend=BasicAirflowAuthBackend(
webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
),
name=AIRFLOW_INSTANCE_NAME,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from dagster_airlift.core import AirflowInstance, BasicAuthBackend
from dagster_airlift.core import AirflowInstance, BasicAirflowAuthBackend

from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME


def local_airflow_instance() -> AirflowInstance:
return AirflowInstance(
auth_backend=BasicAuthBackend(
auth_backend=BasicAirflowAuthBackend(
webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
),
name=AIRFLOW_INSTANCE_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dagster._core.definitions.decorators.asset_decorator import multi_asset
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
BasicAirflowAuthBackend,
build_defs_from_airflow_instance,
dag_defs,
task_defs,
Expand All @@ -16,7 +16,7 @@
from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME

airflow_instance = AirflowInstance(
auth_backend=BasicAuthBackend(
auth_backend=BasicAirflowAuthBackend(
webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
),
name=AIRFLOW_INSTANCE_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dagster._core.definitions.definitions_class import Definitions
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
BasicAirflowAuthBackend,
build_defs_from_airflow_instance,
dag_defs,
task_defs,
Expand All @@ -15,7 +15,7 @@
from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME

airflow_instance = AirflowInstance(
auth_backend=BasicAuthBackend(
auth_backend=BasicAirflowAuthBackend(
webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
),
name=AIRFLOW_INSTANCE_NAME,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance
from dagster_airlift.core import (
AirflowInstance,
BasicAirflowAuthBackend,
build_defs_from_airflow_instance,
)

from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME

airflow_instance = AirflowInstance(
auth_backend=BasicAuthBackend(
auth_backend=BasicAirflowAuthBackend(
webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
),
name=AIRFLOW_INSTANCE_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
BasicAirflowAuthBackend,
assets_with_task_mappings,
build_defs_from_airflow_instance,
)
Expand Down Expand Up @@ -92,7 +92,7 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):

defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=BasicAuthBackend(
auth_backend=BasicAirflowAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
BasicAirflowAuthBackend,
assets_with_dag_mappings,
build_defs_from_airflow_instance,
)
Expand Down Expand Up @@ -87,7 +87,7 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):

defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=BasicAuthBackend(
auth_backend=BasicAirflowAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
BasicAirflowAuthBackend,
assets_with_task_mappings,
build_defs_from_airflow_instance,
)
Expand Down Expand Up @@ -118,7 +118,7 @@ def validate_exported_csv() -> AssetCheckResult:

defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=BasicAuthBackend(
auth_backend=BasicAirflowAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dagster import AssetExecutionContext, AssetSpec, Definitions
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
BasicAirflowAuthBackend,
assets_with_task_mappings,
build_defs_from_airflow_instance,
)
Expand Down Expand Up @@ -37,7 +37,7 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):

defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=BasicAuthBackend(
auth_backend=BasicAirflowAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dagster import AssetExecutionContext, AssetSpec, Definitions
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
BasicAirflowAuthBackend,
assets_with_dag_mappings,
build_defs_from_airflow_instance,
)
Expand Down Expand Up @@ -39,7 +39,7 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):

defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=BasicAuthBackend(
auth_backend=BasicAirflowAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
Expand Down
Loading

0 comments on commit f655af7

Please sign in to comment.