diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 40170a8fd5ea7..a839867296fa8 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -2,6 +2,7 @@ import logging import os import time +from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from functools import lru_cache, partial from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type, Union @@ -10,11 +11,13 @@ import requests from dagster import ( AssetExecutionContext, + AssetMaterialization, Definitions, Failure, InitResourceContext, MetadataValue, OpExecutionContext, + Output, __version__, _check as check, get_dagster_logger, @@ -24,7 +27,11 @@ from dagster._config.pythonic_config import ConfigurableResource from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader +from dagster._core.definitions.metadata.metadata_set import TableMetadataSet +from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster._core.definitions.resource_definition import dagster_maintained_resource +from dagster._core.errors import DagsterStepOutputNotFoundError +from dagster._core.utils import imap from dagster._record import record from dagster._utils.cached_method import cached_method from dagster._vendored.dateutil import parser @@ -37,11 +44,16 @@ FivetranConnector, FivetranConnectorScheduleType, FivetranDestination, + FivetranMetadataSet, FivetranSchemaConfig, FivetranWorkspaceData, ) from dagster_fivetran.types import FivetranOutput -from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url +from dagster_fivetran.utils import ( + generate_materializations, + get_fivetran_connector_url, + get_fivetran_logs_url, +) FIVETRAN_API_BASE = "https://api.fivetran.com" FIVETRAN_API_VERSION = "v1" @@ -49,9 +61,14 @@ FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/" FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/" +DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY = "dagster-fivetran/fetch_column_metadata" +DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY = "dagster-fivetran/infer_missing_tables" + # default polling interval (in seconds) DEFAULT_POLL_INTERVAL = 10 +DEFAULT_MAX_THREADPOOL_WORKERS = 10 + FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-fivetran/reconstruction_metadata" @@ -577,6 +594,25 @@ def get_schema_config_for_connector(self, connector_id: str) -> Mapping[str, Any """ return self._make_request("GET", f"connectors/{connector_id}/schemas") + def get_columns_for_table( + self, connector_id: str, schema_name: str, table_name: str + ) -> Mapping[str, Any]: + """Fetches the connector schema config for a given connector from the Fivetran API. + + Args: + connector_id (str): The Fivetran Connector ID. + schema_name (str): The Fivetran Schema name. + table_name (str): The Fivetran Table name. + + Returns: + Dict[str, Any]: Parsed json data from the response to this request. + """ + self._make_connector_request( + method="GET", + endpoint=f"{connector_id}/schemas/{schema_name}/tables/{table_name}/columns", + ) + return self._make_request("GET", f"connectors/{connector_id}/schemas") + def get_destination_details(self, destination_id: str) -> Mapping[str, Any]: """Fetches details about a given destination from the Fivetran API. @@ -831,6 +867,7 @@ class FivetranWorkspace(ConfigurableResource): _client: FivetranClient = PrivateAttr(default=None) + @cached_method def get_client(self) -> FivetranClient: return FivetranClient( api_key=self.api_key, @@ -891,10 +928,115 @@ def fetch_fivetran_workspace_data( schema_configs_by_connector_id=schema_configs_by_connector_id, ) + def _fetch_and_attach_col_metadata( + self, connector_id: str, materialization: AssetMaterialization + ) -> AssetMaterialization: + """Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the + materialization. + """ + try: + schema_source_name = materialization.metadata["schema_source_name"].value + table_source_name = materialization.metadata["table_source_name"].value + + table_conn_data = self.get_client().get_columns_for_table( + connector_id=connector_id, + schema_name=schema_source_name, + table_name=table_source_name, + ) + columns = check.dict_elem(table_conn_data, "columns") + table_columns = sorted( + [ + TableColumn(name=col["name_in_destination"], type="") + for col in columns.values() + if "name_in_destination" in col and col.get("enabled") + ], + key=lambda col: col.name, + ) + return materialization.with_metadata( + { + **materialization.metadata, + **TableMetadataSet(column_schema=TableSchema(table_columns)), + } + ) + except Exception as e: + self._log.warning( + "An error occurred while fetching column metadata for table %s", + f"Exception: {e}", + exc_info=True, + ) + return materialization + def sync_and_poll( self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None ): - raise NotImplementedError() + # TODO: Add docstrings + assets_def = context.assets_def + + # TODO: Add op tags to fivetran_assets decorator and build_fivetran_assets_definitions factory + fetch_column_metadata = context.op.tags.get(DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY) + infer_missing_tables = context.op.tags.get( + DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY + ) + + connector_id = next( + check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) + for spec in assets_def.specs + ) + + client = self.get_client() + fivetran_output = client.sync_and_poll( + connector_id=connector_id, + ) + + materialized_asset_keys = set() + + _map_fn: Callable[[AssetMaterialization], AssetMaterialization] = ( + lambda materialization: self._fetch_and_attach_col_metadata( + connector_id, materialization + ) + if fetch_column_metadata + else materialization + ) + with ThreadPoolExecutor( + max_workers=DEFAULT_MAX_THREADPOOL_WORKERS, + thread_name_prefix=f"fivetran_{connector_id}", + ) as executor: + for materialization in imap( + executor=executor, + # TODO: Create new asset materialization fn with assets and not asset key prefix + iterable=generate_materializations( + fivetran_output, + asset_key_prefix=[], + ), + func=_map_fn, + ): + # scan through all tables actually created, if it was expected then emit an Output. + # otherwise, emit a runtime AssetMaterialization + if materialization.asset_key in context.selected_asset_keys: + yield Output( + value=None, + output_name=materialization.asset_key.to_python_identifier(), + metadata=materialization.metadata, + ) + materialized_asset_keys.add(materialization.asset_key) + + else: + yield materialization + + unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys + if infer_missing_tables: + for asset_key in unmaterialized_asset_keys: + yield Output(value=None, output_name=asset_key.to_python_identifier()) + else: + if unmaterialized_asset_keys: + asset_key = next(iter(unmaterialized_asset_keys)) + output_name = "_".join(asset_key.path) + raise DagsterStepOutputNotFoundError( + f"Core compute for {context.op_def.name} did not return an output for" + f' non-optional output "{output_name}".', + step_key=context.get_step_execution_context().step.key, + output_name=output_name, + ) def __eq__(self, other): return (