From 67a034aee94204891eac2882d72c668f191e1d75 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 18 Dec 2024 23:13:14 -0500 Subject: [PATCH] [dagster-fivetran] Move DagsterFivetranTranslator metadata to Fivetran specs loader --- .../dagster_fivetran/asset_decorator.py | 5 +--- .../dagster_fivetran/resources.py | 24 ++++++++++++------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py index 2f8fba289d00b..81b824a7df666 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py @@ -5,7 +5,6 @@ from dagster_fivetran.resources import FivetranWorkspace from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet -from dagster_fivetran.utils import DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY @experimental @@ -109,9 +108,7 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet group_name=group_name, can_subset=True, specs=[ - spec.merge_attributes( - metadata={DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY: dagster_fivetran_translator} - ) + spec for spec in workspace.load_asset_specs( dagster_fivetran_translator=dagster_fivetran_translator ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index f56b0da16ce8f..00347094fe793 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -46,6 +46,7 @@ ) from dagster_fivetran.types import FivetranOutput from dagster_fivetran.utils import ( + DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY, get_fivetran_connector_table_name, get_fivetran_connector_url, get_fivetran_logs_url, @@ -1100,16 +1101,23 @@ def load_fivetran_asset_specs( fivetran_specs = load_fivetran_asset_specs(fivetran_workspace) defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace} """ + dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator() + with workspace.process_config_and_initialize_cm() as initialized_workspace: - return check.is_list( - FivetranWorkspaceDefsLoader( - workspace=initialized_workspace, - translator=dagster_fivetran_translator or DagsterFivetranTranslator(), + return [ + spec.merge_attributes( + metadata={DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY: dagster_fivetran_translator} ) - .build_defs() - .assets, - AssetSpec, - ) + for spec in check.is_list( + FivetranWorkspaceDefsLoader( + workspace=initialized_workspace, + translator=dagster_fivetran_translator, + ) + .build_defs() + .assets, + AssetSpec, + ) + ] @record