Skip to content

Commit

Permalink
Update callsites of AssetSpec._replace to use replace_attributes (#25942
Browse files Browse the repository at this point in the history
)

## Summary

Follow-on to #25941 which updates some non-type-checked `_replace`
callsites to use these new methods.
  • Loading branch information
benpankow authored Nov 26, 2024
1 parent e25f7c5 commit 94d2490
Show file tree
Hide file tree
Showing 12 changed files with 30 additions and 32 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _attach_code_references_to_definitions(blueprint: Blueprint, defs: Definitio

new_assets_defs.append(
assets_def.map_asset_specs(
lambda spec: spec._replace(metadata=new_metadata_by_key[spec.key])
lambda spec: spec.replace_attributes(metadata=new_metadata_by_key[spec.key])
)
)
return copy(defs, assets=new_assets_defs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def filter_specs_by_tag(specs: Sequence[AssetSpec], tag: str) -> Dict[AssetKey,


def add_dep_to_spec(spec: AssetSpec, dep: AssetKey) -> AssetSpec:
return spec._replace(deps=[*spec.deps, AssetDep(dep)])
return spec.replace_attributes(deps=[*spec.deps, AssetDep(dep)])


def key_for_uid(specs: Sequence[AssetSpec], uid: str) -> AssetKey:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def jaffle_shop_assets(context: AssetExecutionContext, dbt: DbtCliResource):


jaffle_shop_external_assets = [
spec._replace(code_version=None, skippable=False) for spec in jaffle_shop_assets.specs
spec.replace_attributes(code_version=None, skippable=False) for spec in jaffle_shop_assets.specs
]

jaffle_shop_with_upstream = eager_asset(with_deps(DBT_SOURCE_TO_DAG, jaffle_shop_assets))
Expand Down
10 changes: 7 additions & 3 deletions examples/starlift-demo/dbt_example/dagster_defs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ def apply_eager_automation(defs: Definitions) -> Definitions:
continue
assets.append(
asset.map_asset_specs(
lambda spec: spec._replace(automation_condition=AutomationCondition.eager())
lambda spec: spec.replace_attributes(
automation_condition=AutomationCondition.eager()
)
if spec.automation_condition is None
else spec
)
Expand All @@ -40,7 +42,7 @@ def apply_eager_automation(defs: Definitions) -> Definitions:


def with_group(assets_def: AssetsDefinition, group_name: str) -> AssetsDefinition:
return assets_def.map_asset_specs(lambda spec: spec._replace(group_name=group_name))
return assets_def.map_asset_specs(lambda spec: spec.replace_attributes(group_name=group_name))


def with_deps(
Expand Down Expand Up @@ -93,4 +95,6 @@ def with_deps(


def eager(specs: Sequence[AssetSpec]) -> Sequence[AssetSpec]:
return [spec._replace(automation_condition=AutomationCondition.eager()) for spec in specs]
return [
spec.replace_attributes(automation_condition=AutomationCondition.eager()) for spec in specs
]
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ def merge_attributes(
owners=[*self.owners, *(owners if owners is not ... else [])],
tags={**current_tags_without_kinds, **(tags if tags is not ... else {})},
kinds={*self.kinds, *(kinds if kinds is not ... else {})},
auto_materialize_policy=self.auto_materialize_policy,
partitions_def=self.partitions_def,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ def specs(self) -> Sequence[AssetSpec]:
" AssetSpecs/AssetOuts supplied to this multi_asset have a group_name defined.",
)

return [spec._replace(group_name=self.group_name) for spec in specs]
return [spec.replace_attributes(group_name=self.group_name) for spec in specs]

def _synthesize_specs(self) -> Sequence[AssetSpec]:
resolved_specs = []
Expand Down
25 changes: 11 additions & 14 deletions python_modules/dagster/dagster/_core/definitions/external_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,21 +183,18 @@ def create_unexecutable_external_asset_from_assets_def(
for key in assets_def.keys:
orig_spec = assets_def.get_asset_spec(key)
specs.append(
orig_spec._replace(
metadata={
**(orig_spec.metadata or {}),
**(
{
SYSTEM_METADATA_KEY_IO_MANAGER_KEY: assets_def.get_io_manager_key_for_asset_key(
key
)
}
if assets_def.has_output_for_asset_key(key)
else {}
),
},
orig_spec.merge_attributes(
metadata=(
{
SYSTEM_METADATA_KEY_IO_MANAGER_KEY: assets_def.get_io_manager_key_for_asset_key(
key
)
}
if assets_def.has_output_for_asset_key(key)
else {}
),
).replace_attributes(
automation_condition=None,
freshness_policy=None,
)
)
return AssetsDefinition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def _with_code_source_single_definition(
}

return assets_def.map_asset_specs(
lambda spec: spec._replace(metadata=metadata_by_key[spec.key])
lambda spec: spec.replace_attributes(metadata=metadata_by_key[spec.key])
)


Expand Down Expand Up @@ -279,7 +279,7 @@ def _convert_local_path_to_git_path_single_definition(
}

return assets_def.map_asset_specs(
lambda spec: spec._replace(metadata=metadata_by_key[spec.key])
lambda spec: spec.replace_attributes(metadata=metadata_by_key[spec.key])
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ def enrich_spec_with_airflow_task_metadata(
tasks: AbstractSet[TaskHandle],
serialized_data: SerializedAirflowDefinitionsData,
) -> AssetSpec:
tags = {**spec.tags, **airlift_mapped_kind_dict()}
return spec._replace(
metadata={**spec.metadata, **metadata_for_mapped_tasks(tasks, serialized_data)},
tags=tags,
return spec.merge_attributes(
metadata=metadata_for_mapped_tasks(tasks, serialized_data),
tags=airlift_mapped_kind_dict(),
)


Expand All @@ -65,10 +64,9 @@ def enrich_spec_with_airflow_dag_metadata(
dags: AbstractSet[DagHandle],
serialized_data: SerializedAirflowDefinitionsData,
) -> AssetSpec:
tags = {**spec.tags, **airlift_mapped_kind_dict()}
return spec._replace(
metadata={**spec.metadata, **metadata_for_mapped_dags(dags, serialized_data)},
tags=tags,
return spec.merge_attributes(
metadata=metadata_for_mapped_dags(dags, serialized_data),
tags=airlift_mapped_kind_dict(),
)


Expand Down

1 comment on commit 94d2490

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-mwgd9su9l-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit 94d2490.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.