diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py index 6e65515e20f45..2f0716a2e6c54 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py @@ -11,6 +11,7 @@ ) from dagster_fivetran.resources import ( FivetranResource as FivetranResource, + FivetranWorkspace as FivetranWorkspace, fivetran_resource as fivetran_resource, ) from dagster_fivetran.types import FivetranOutput as FivetranOutput diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 2009bdea1ec37..5bd61ffb1b86d 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -3,6 +3,7 @@ import logging import os import time +from enum import Enum from typing import Any, Mapping, Optional, Sequence, Tuple from urllib.parse import urljoin @@ -16,11 +17,14 @@ get_dagster_logger, resource, ) +from dagster._annotations import experimental from dagster._config.pythonic_config import ConfigurableResource from dagster._core.definitions.resource_definition import dagster_maintained_resource +from dagster._record import record +from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.cached_method import cached_method from dagster._vendored.dateutil import parser -from pydantic import Field +from pydantic import Field, PrivateAttr from requests.auth import HTTPBasicAuth from requests.exceptions import RequestException @@ -436,3 +440,142 @@ def my_fivetran_job(): """ return FivetranResource.from_resource_context(context) + + +# ------------------ +# Reworked resources +# ------------------ +class FivetranContentType(Enum): + """Enum representing each object in Fivetran's ontology.""" + + CONNECTOR = "connector" + DESTINATION = "destination" + + +@whitelist_for_serdes +@record +class FivetranContentData: + """A record representing a piece of content in a Fivetran workspace. + Includes the object's type and data as returned from the API. + """ + + content_type: FivetranContentType + properties: Mapping[str, Any] + + +@record +class FivetranWorkspaceData: + """A record representing all content in a Fivetran workspace. + Provided as context for the translator so that it can resolve dependencies between content. + """ + + connectors_by_id: Mapping[str, FivetranContentData] + destinations_by_id: Mapping[str, FivetranContentData] + + @classmethod + def from_content_data( + cls, content_data: Sequence[FivetranContentData] + ) -> "FivetranWorkspaceData": + raise NotImplementedError() + + +@experimental +class FivetranClient: + """This class exposes methods on top of the Fivetran REST API.""" + + def __init__( + self, + api_key: str, + api_secret: str, + request_max_retries: int, + request_retry_delay: float, + ): + self.api_key = api_key + self.api_secret = api_secret + self.request_max_retries = request_max_retries + self.request_retry_delay = request_retry_delay + + @property + def _auth(self) -> HTTPBasicAuth: + raise NotImplementedError() + + @property + @cached_method + def _log(self) -> logging.Logger: + return get_dagster_logger() + + @property + def api_base_url(self) -> str: + raise NotImplementedError() + + @property + def api_connector_url(self) -> str: + raise NotImplementedError() + + def make_connector_request( + self, method: str, endpoint: str, data: Optional[str] = None + ) -> Mapping[str, Any]: + raise NotImplementedError() + + def make_request( + self, method: str, endpoint: str, data: Optional[str] = None + ) -> Mapping[str, Any]: + raise NotImplementedError() + + def get_connector_details(self, connector_id: str) -> Mapping[str, Any]: + """Fetches details about a given connector from the Fivetran API.""" + raise NotImplementedError() + + def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]: + """Fetches all connectors for a given group from the Fivetran API.""" + raise NotImplementedError() + + def get_destination_details(self, destination_id: str) -> Mapping[str, Any]: + """Fetches details about a given destination from the Fivetran API.""" + raise NotImplementedError() + + def get_groups(self) -> Mapping[str, Any]: + """Fetches all groups from the Fivetran API.""" + raise NotImplementedError() + + +@experimental +class FivetranWorkspace(ConfigurableResource): + """This class represents a Fivetran workspace and provides utilities + to interact with Fivetran APIs. + """ + + api_key: str = Field(description="The Fivetran API key to use for this resource.") + api_secret: str = Field(description="The Fivetran API secret to use for this resource.") + request_max_retries: int = Field( + default=3, + description=( + "The maximum number of times requests to the Fivetran API should be retried " + "before failing." + ), + ) + request_retry_delay: float = Field( + default=0.25, + description="Time (in seconds) to wait between each request retry.", + ) + + _client: FivetranClient = PrivateAttr(default=None) + + def get_client(self) -> FivetranClient: + return FivetranClient( + api_key=self.api_key, + api_secret=self.api_secret, + request_max_retries=self.request_max_retries, + request_retry_delay=self.request_retry_delay, + ) + + def fetch_fivetran_workspace_data( + self, + ) -> FivetranWorkspaceData: + """Retrieves all Fivetran content from the workspace and returns it as a FivetranWorkspaceData object. + Future work will cache this data to avoid repeated calls to the Fivetran API. + + Returns: + FivetranWorkspaceData: A snapshot of the Fivetran workspace's content. + """ + raise NotImplementedError()