Skip to content

Commit

Permalink
Move translator as metadata to specs loader
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Dec 19, 2024
1 parent 162de89 commit 826d8e6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from dagster_airbyte.resources import AirbyteCloudWorkspace
from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator
from dagster_airbyte.utils import DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY


@experimental
Expand Down Expand Up @@ -105,9 +104,7 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt
group_name=group_name,
can_subset=True,
specs=[
spec.merge_attributes(
metadata={DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY: dagster_airbyte_translator}
)
spec
for spec in workspace.load_asset_specs(
dagster_airbyte_translator=dagster_airbyte_translator
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
)
from dagster_airbyte.types import AirbyteOutput
from dagster_airbyte.utils import (
DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY,
get_airbyte_connection_table_name,
get_translator_from_airbyte_assets,
)
Expand Down Expand Up @@ -1339,16 +1340,23 @@ def load_airbyte_cloud_asset_specs(
airbyte_cloud_specs = load_airbyte_cloud_asset_specs(airbyte_cloud_workspace)
defs = dg.Definitions(assets=airbyte_cloud_specs)
"""
dagster_airbyte_translator = dagster_airbyte_translator or DagsterAirbyteTranslator()

with workspace.process_config_and_initialize_cm() as initialized_workspace:
return check.is_list(
AirbyteCloudWorkspaceDefsLoader(
workspace=initialized_workspace,
translator=dagster_airbyte_translator or DagsterAirbyteTranslator(),
return [
spec.merge_attributes(
metadata={DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY: dagster_airbyte_translator}
)
.build_defs()
.assets,
AssetSpec,
)
for spec in check.is_list(
AirbyteCloudWorkspaceDefsLoader(
workspace=initialized_workspace,
translator=dagster_airbyte_translator,
)
.build_defs()
.assets,
AssetSpec,
)
]


@record
Expand Down

0 comments on commit 826d8e6

Please sign in to comment.