Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1/n][dagster-fivetran] Scaffold FivetranWorkspace for rework #25750

Merged
merged 4 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)
from dagster_fivetran.resources import (
FivetranResource as FivetranResource,
FivetranWorkspace as FivetranWorkspace,
fivetran_resource as fivetran_resource,
)
from dagster_fivetran.types import FivetranOutput as FivetranOutput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import time
from enum import Enum
from typing import Any, Mapping, Optional, Sequence, Tuple
from urllib.parse import urljoin

Expand All @@ -16,11 +17,14 @@
get_dagster_logger,
resource,
)
from dagster._annotations import experimental
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser
from pydantic import Field
from pydantic import Field, PrivateAttr
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException

Expand Down Expand Up @@ -436,3 +440,142 @@ def my_fivetran_job():

"""
return FivetranResource.from_resource_context(context)


# ------------------
# Reworked resources
# ------------------
class FivetranContentType(Enum):
"""Enum representing each object in Fivetran's ontology."""

CONNECTOR = "connector"
DESTINATION = "destination"


@whitelist_for_serdes
@record
class FivetranContentData:
"""A record representing a piece of content in a Fivetran workspace.
Includes the object's type and data as returned from the API.
"""

content_type: FivetranContentType
properties: Mapping[str, Any]


@record
class FivetranWorkspaceData:
"""A record representing all content in a Fivetran workspace.
Provided as context for the translator so that it can resolve dependencies between content.
"""

connectors_by_id: Mapping[str, FivetranContentData]
destinations_by_id: Mapping[str, FivetranContentData]

@classmethod
def from_content_data(
cls, content_data: Sequence[FivetranContentData]
) -> "FivetranWorkspaceData":
raise NotImplementedError()


@experimental
class FivetranClient:
"""This class exposes methods on top of the Fivetran REST API."""

def __init__(
self,
api_key: str,
api_secret: str,
request_max_retries: int,
request_retry_delay: float,
):
self.api_key = api_key
self.api_secret = api_secret
self.request_max_retries = request_max_retries
self.request_retry_delay = request_retry_delay

@property
def _auth(self) -> HTTPBasicAuth:
raise NotImplementedError()

@property
@cached_method
def _log(self) -> logging.Logger:
return get_dagster_logger()

@property
def api_base_url(self) -> str:
raise NotImplementedError()

@property
def api_connector_url(self) -> str:
raise NotImplementedError()

def make_connector_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
raise NotImplementedError()

def make_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
raise NotImplementedError()

def get_connector_details(self, connector_id: str) -> Mapping[str, Any]:
"""Fetches details about a given connector from the Fivetran API."""
raise NotImplementedError()

def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]:
"""Fetches all connectors for a given group from the Fivetran API."""
raise NotImplementedError()

def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
"""Fetches details about a given destination from the Fivetran API."""
raise NotImplementedError()

def get_groups(self) -> Mapping[str, Any]:
"""Fetches all groups from the Fivetran API."""
raise NotImplementedError()


@experimental
class FivetranWorkspace(ConfigurableResource):
"""This class represents a Fivetran workspace and provides utilities
to interact with Fivetran APIs.
"""

api_key: str = Field(description="The Fivetran API key to use for this resource.")
api_secret: str = Field(description="The Fivetran API secret to use for this resource.")
request_max_retries: int = Field(
default=3,
description=(
"The maximum number of times requests to the Fivetran API should be retried "
"before failing."
),
)
request_retry_delay: float = Field(
default=0.25,
description="Time (in seconds) to wait between each request retry.",
)

_client: FivetranClient = PrivateAttr(default=None)

def get_client(self) -> FivetranClient:
return FivetranClient(
api_key=self.api_key,
api_secret=self.api_secret,
request_max_retries=self.request_max_retries,
request_retry_delay=self.request_retry_delay,
)

def fetch_fivetran_workspace_data(
self,
) -> FivetranWorkspaceData:
"""Retrieves all Fivetran content from the workspace and returns it as a FivetranWorkspaceData object.
Future work will cache this data to avoid repeated calls to the Fivetran API.

Returns:
FivetranWorkspaceData: A snapshot of the Fivetran workspace's content.
"""
raise NotImplementedError()