Skip to content

Commit

Permalink
[11/n][dagster-fivetran] Implement materialization method in Fivetran…
Browse files Browse the repository at this point in the history
…Workspace
  • Loading branch information
maximearmstrong committed Nov 22, 2024
1 parent b7c21f9 commit 3150e3e
Showing 1 changed file with 144 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type, Union
Expand All @@ -10,11 +11,13 @@
import requests
from dagster import (
AssetExecutionContext,
AssetMaterialization,
Definitions,
Failure,
InitResourceContext,
MetadataValue,
OpExecutionContext,
Output,
__version__,
_check as check,
get_dagster_logger,
Expand All @@ -24,7 +27,11 @@
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._core.errors import DagsterStepOutputNotFoundError
from dagster._core.utils import imap
from dagster._record import record
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser
Expand All @@ -37,21 +44,31 @@
FivetranConnector,
FivetranConnectorScheduleType,
FivetranDestination,
FivetranMetadataSet,
FivetranSchemaConfig,
FivetranWorkspaceData,
)
from dagster_fivetran.types import FivetranOutput
from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url
from dagster_fivetran.utils import (
generate_materializations,
get_fivetran_connector_url,
get_fivetran_logs_url,
)

FIVETRAN_API_BASE = "https://api.fivetran.com"
FIVETRAN_API_VERSION = "v1"
FIVETRAN_CONNECTOR_ENDPOINT = "connectors"
FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/"
FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/"

DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY = "dagster-fivetran/fetch_column_metadata"
DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY = "dagster-fivetran/infer_missing_tables"

# default polling interval (in seconds)
DEFAULT_POLL_INTERVAL = 10

DEFAULT_MAX_THREADPOOL_WORKERS = 10

FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-fivetran/reconstruction_metadata"


Expand Down Expand Up @@ -577,6 +594,25 @@ def get_schema_config_for_connector(self, connector_id: str) -> Mapping[str, Any
"""
return self._make_request("GET", f"connectors/{connector_id}/schemas")

def get_columns_for_table(
self, connector_id: str, schema_name: str, table_name: str
) -> Mapping[str, Any]:
"""Fetches the connector schema config for a given connector from the Fivetran API.
Args:
connector_id (str): The Fivetran Connector ID.
schema_name (str): The Fivetran Schema name.
table_name (str): The Fivetran Table name.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
self._make_connector_request(
method="GET",
endpoint=f"{connector_id}/schemas/{schema_name}/tables/{table_name}/columns",
)
return self._make_request("GET", f"connectors/{connector_id}/schemas")

def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
"""Fetches details about a given destination from the Fivetran API.
Expand Down Expand Up @@ -832,6 +868,7 @@ class FivetranWorkspace(ConfigurableResource):

_client: FivetranClient = PrivateAttr(default=None)

@cached_method
def get_client(self) -> FivetranClient:
return FivetranClient(
api_key=self.api_key,
Expand Down Expand Up @@ -927,10 +964,115 @@ def load_asset_specs(
workspace=self, dagster_fivetran_translator=dagster_fivetran_translator
)

def _fetch_and_attach_col_metadata(
self, connector_id: str, materialization: AssetMaterialization
) -> AssetMaterialization:
"""Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the
materialization.
"""
try:
schema_source_name = materialization.metadata["schema_source_name"].value
table_source_name = materialization.metadata["table_source_name"].value

table_conn_data = self.get_client().get_columns_for_table(
connector_id=connector_id,
schema_name=schema_source_name,
table_name=table_source_name,
)
columns = check.dict_elem(table_conn_data, "columns")
table_columns = sorted(
[
TableColumn(name=col["name_in_destination"], type="")
for col in columns.values()
if "name_in_destination" in col and col.get("enabled")
],
key=lambda col: col.name,
)
return materialization.with_metadata(
{
**materialization.metadata,
**TableMetadataSet(column_schema=TableSchema(table_columns)),
}
)
except Exception as e:
self._log.warning(
"An error occurred while fetching column metadata for table %s",
f"Exception: {e}",
exc_info=True,
)
return materialization

def sync_and_poll(
self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None
):
raise NotImplementedError()
# TODO: Add docstrings
assets_def = context.assets_def

# TODO: Add op tags to fivetran_assets decorator and build_fivetran_assets_definitions factory
fetch_column_metadata = context.op.tags.get(DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY)
infer_missing_tables = context.op.tags.get(
DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY
)

connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in assets_def.specs
)

client = self.get_client()
fivetran_output = client.sync_and_poll(
connector_id=connector_id,
)

materialized_asset_keys = set()

_map_fn: Callable[[AssetMaterialization], AssetMaterialization] = (
lambda materialization: self._fetch_and_attach_col_metadata(
connector_id, materialization
)
if fetch_column_metadata
else materialization
)
with ThreadPoolExecutor(
max_workers=DEFAULT_MAX_THREADPOOL_WORKERS,
thread_name_prefix=f"fivetran_{connector_id}",
) as executor:
for materialization in imap(
executor=executor,
# TODO: Create new asset materialization fn with assets and not asset key prefix
iterable=generate_materializations(
fivetran_output,
asset_key_prefix=[],
),
func=_map_fn,
):
# scan through all tables actually created, if it was expected then emit an Output.
# otherwise, emit a runtime AssetMaterialization
if materialization.asset_key in context.selected_asset_keys:
yield Output(
value=None,
output_name=materialization.asset_key.to_python_identifier(),
metadata=materialization.metadata,
)
materialized_asset_keys.add(materialization.asset_key)

else:
yield materialization

unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
if infer_missing_tables:
for asset_key in unmaterialized_asset_keys:
yield Output(value=None, output_name=asset_key.to_python_identifier())
else:
if unmaterialized_asset_keys:
asset_key = next(iter(unmaterialized_asset_keys))
output_name = "_".join(asset_key.path)
raise DagsterStepOutputNotFoundError(
f"Core compute for {context.op_def.name} did not return an output for"
f' non-optional output "{output_name}".',
step_key=context.get_step_execution_context().step.key,
output_name=output_name,
)


@experimental
Expand Down

0 comments on commit 3150e3e

Please sign in to comment.