Skip to content

Commit

Permalink
claire/relation-identifier-to-table-name
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Oct 21, 2024
1 parent 0ec736b commit fe4ac9f
Show file tree
Hide file tree
Showing 24 changed files with 74 additions and 81 deletions.
2 changes: 1 addition & 1 deletion docs/content/concepts/metadata-tags/asset-metadata.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ The `dagster` prefix indicates that the Dagster package takes responsibility for
</tr>
<tr>
<td>
<code>dagster/relation_identifier</code>
<code>dagster/table_name</code>
</td>
<td>
<ul
Expand Down
2 changes: 1 addition & 1 deletion docs/docs-beta/docs/guides/metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ The following metadata keys are given special treatment in the Dagster UI.
| `dagster/column_lineage` | **Type:** [`TableColumnLineage`](/todo) <br/><br/> For an asset that's a table, the lineage of column inputs to column outputs for the table. Refer to the [Table and column metadata](#table-schema) section for details. |
| `dagster/row_count` | **Type:** `int` <br/><br/> For an asset that's a table, the number of rows in the table. Refer to the Table metadata documentation for details. |
| `dagster/partition_row_count` | **Type:** `int` <br/><br/> For a partition of an asset that's a table, the number of rows in the partition. |
| `dagster/relation_identifier` | **Type:** `str` <br/><br/> A unique identifier for the table/view, typically fully qualified. For example, my_database.my_schema.my_table |
| `dagster/table_name` | **Type:** `str` <br/><br/> A unique identifier for the table/view, typically fully qualified. For example, my_database.my_schema.my_table |
| `dagster/code_references` | **Type:** [`CodeReferencesMetadataValue`](/todo) <br/><br/> A list of code references for the asset, such as file locations or references to GitHub URLs. Refer to the [Linking assets with their source code](#source-code) section for details. Should only be provided in definition-level metadata, not materialization metadata. |

## Table and column metadata \{#table-column}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ export const isCanonicalCodeSourceEntry = (

export const isCanonicalRelationIdentifierEntry = (
m: MetadataEntryLabelOnly,
): m is TextMetadataEntry => m && m.label === 'dagster/relation_identifier';
): m is TextMetadataEntry =>
m && (m.label === 'dagster/relation_identifier' || m.label === 'dagster/table_name');

export const isCanonicalUriEntry = (
m: MetadataEntryLabelOnly,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,15 @@ class TableMetadataSet(NamespacedMetadataSet):
outputs for the table.
row_count (Optional[int]): The number of rows in the table.
partition_row_count (Optional[int]): The number of rows in the materialized or observed partition.
relation_identifier (Optional[str]): A unique identifier for the table/view, typically fully qualified.
table_name (Optional[str]): A unique identifier for the table/view, typically fully qualified.
For example, `my_database.my_schema.my_table`.
"""

column_schema: Optional[TableSchema] = None
column_lineage: Optional[TableColumnLineage] = None
row_count: Optional[int] = None
partition_row_count: Optional[int] = None
relation_identifier: Optional[str] = None
table_name: Optional[str] = None

@classmethod
def namespace(cls) -> str:
Expand Down
10 changes: 3 additions & 7 deletions python_modules/dagster/dagster/_core/storage/db_io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ def delete_table_slice(
def get_select_statement(table_slice: TableSlice) -> str: ...

@staticmethod
def get_relation_identifier(table_slice: TableSlice) -> Optional[str]:
"""Returns a string which is set as the dagster/relation_identifier metadata value for an
def get_table_name(table_slice: TableSlice) -> Optional[str]:
"""Returns a string which is set as the dagster/table_name metadata value for an
emitted asset. This value should be the fully qualified name of the table, including the
schema and database, if applicable.
"""
Expand Down Expand Up @@ -171,11 +171,7 @@ def handle_output(self, context: OutputContext, obj: object) -> None:
# don't fail if it errors because the user has already attached it.
try:
context.add_output_metadata(
dict(
TableMetadataSet(
relation_identifier=self._db_client.get_relation_identifier(table_slice)
)
)
dict(TableMetadataSet(table_name=self._db_client.get_table_name(table_slice)))
)
except DagsterInvalidMetadata:
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ def test_table_metadata_set() -> None:
assert TableMetadataSet.extract(dict(TableMetadataSet())) == TableMetadataSet()


def test_relation_identifier() -> None:
table_metadata = TableMetadataSet(relation_identifier="my_database.my_schema.my_table")
def test_table_name() -> None:
table_metadata = TableMetadataSet(table_name="my_database.my_schema.my_table")

dict_table_metadata = dict(table_metadata)
assert dict_table_metadata == {"dagster/relation_identifier": "my_database.my_schema.my_table"}
assert dict_table_metadata == {"dagster/table_name": "my_database.my_schema.my_table"}
AssetMaterialization(asset_key="a", metadata=dict_table_metadata)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
}


def mock_relation_identifier(*args, **kwargs) -> str:
return "relation_identifier"
def mock_table_name(*args, **kwargs) -> str:
return "table_name"


class IntHandler(DbTypeHandler[int]):
Expand Down Expand Up @@ -81,7 +81,7 @@ def test_asset_out():
spec=DbClient,
get_select_statement=MagicMock(return_value=""),
connect=connect_mock,
get_relation_identifier=mock_relation_identifier,
get_table_name=mock_table_name,
)
manager = build_db_io_manager(type_handlers=[handler], db_client=db_client)
asset_key = AssetKey(["schema1", "table1"])
Expand Down Expand Up @@ -117,7 +117,7 @@ def test_asset_out_columns():
spec=DbClient,
get_select_statement=MagicMock(return_value=""),
connect=connect_mock,
get_relation_identifier=mock_relation_identifier,
get_table_name=mock_table_name,
)
manager = build_db_io_manager(type_handlers=[handler], db_client=db_client)
asset_key = AssetKey(["schema1", "table1"])
Expand Down Expand Up @@ -159,7 +159,7 @@ def test_asset_out_partitioned():
spec=DbClient,
get_select_statement=MagicMock(return_value=""),
connect=connect_mock,
get_relation_identifier=mock_relation_identifier,
get_table_name=mock_table_name,
)
manager = build_db_io_manager(type_handlers=[handler], db_client=db_client)
asset_key = AssetKey(["schema1", "table1"])
Expand Down Expand Up @@ -218,7 +218,7 @@ def test_asset_out_static_partitioned():
spec=DbClient,
get_select_statement=MagicMock(return_value=""),
connect=connect_mock,
get_relation_identifier=mock_relation_identifier,
get_table_name=mock_table_name,
)
manager = build_db_io_manager(type_handlers=[handler], db_client=db_client)
asset_key = AssetKey(["schema1", "table1"])
Expand Down Expand Up @@ -270,7 +270,7 @@ def test_asset_out_multiple_static_partitions():
spec=DbClient,
get_select_statement=MagicMock(return_value=""),
connect=connect_mock,
get_relation_identifier=mock_relation_identifier,
get_table_name=mock_table_name,
)
manager = build_db_io_manager(type_handlers=[handler], db_client=db_client)
asset_key = AssetKey(["schema1", "table1"])
Expand Down Expand Up @@ -323,7 +323,7 @@ def test_different_output_and_input_types():
spec=DbClient,
get_select_statement=MagicMock(return_value=""),
connect=connect_mock,
get_relation_identifier=mock_relation_identifier,
get_table_name=mock_table_name,
)
manager = build_db_io_manager(type_handlers=[int_handler, str_handler], db_client=db_client)
asset_key = AssetKey(["schema1", "table1"])
Expand Down Expand Up @@ -361,7 +361,7 @@ def test_non_asset_out():
spec=DbClient,
get_select_statement=MagicMock(return_value=""),
connect=connect_mock,
get_relation_identifier=mock_relation_identifier,
get_table_name=mock_table_name,
)
manager = build_db_io_manager(type_handlers=[handler], db_client=db_client)
output_context = build_output_context(
Expand Down Expand Up @@ -399,7 +399,7 @@ def test_asset_schema_defaults():
db_client = MagicMock(
spec=DbClient,
get_select_statement=MagicMock(return_value=""),
get_relation_identifier=mock_relation_identifier,
get_table_name=mock_table_name,
)
manager = build_db_io_manager(type_handlers=[handler], db_client=db_client)

Expand Down Expand Up @@ -561,7 +561,7 @@ def test_default_load_type():
db_client = MagicMock(
spec=DbClient,
get_select_statement=MagicMock(return_value=""),
get_relation_identifier=mock_relation_identifier,
get_table_name=mock_table_name,
)
manager = DbIOManager(
type_handlers=[handler],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ def _build_airbyte_asset_defn_metadata(
for table in destination_tables:
internal_deps[table] = set(upstream_assets or [])

relation_identifiers: Dict[str, str] = {}
table_names: Dict[str, str] = {}
for table in destination_tables:
if destination_database and destination_schema and table:
# Use the destination raw table name to create the relation identifier
relation_identifiers[table] = ".".join(
# Use the destination raw table name to create the table name
table_names[table] = ".".join(
[
destination_database,
destination_schema,
Expand All @@ -129,7 +129,7 @@ def _build_airbyte_asset_defn_metadata(
)
if normalization_tables and normalization_raw_table_names_by_table:
for normalization_table in normalization_tables.get(table, set()):
relation_identifiers[normalization_table] = ".".join(
table_names[normalization_table] = ".".join(
[
destination_database,
destination_schema,
Expand All @@ -156,7 +156,7 @@ def _build_airbyte_asset_defn_metadata(
table: {
**TableMetadataSet(
column_schema=schema_by_table_name.get(table),
relation_identifier=relation_identifiers.get(table),
table_name=table_names.get(table),
),
}
for table in tables
Expand Down Expand Up @@ -302,15 +302,13 @@ def build_airbyte_assets(
chain([destination_tables], normalization_tables.values() if normalization_tables else [])
)

relation_identifiers: Dict[str, str] = {}
table_names: Dict[str, str] = {}
for table in destination_tables:
if destination_database and destination_schema and table:
relation_identifiers[table] = ".".join(
[destination_database, destination_schema, table]
)
table_names[table] = ".".join([destination_database, destination_schema, table])
if normalization_tables:
for normalization_table in normalization_tables.get(table, set()):
relation_identifiers[normalization_table] = ".".join(
table_names[normalization_table] = ".".join(
[
destination_database,
destination_schema,
Expand All @@ -328,7 +326,7 @@ def build_airbyte_assets(
{
**TableMetadataSet(
column_schema=schema_by_table_name.get(table),
relation_identifier=relation_identifiers.get(table),
table_name=table_names.get(table),
),
}
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def downstream_of_ab():
assert downstream_of_ab.op.ins["some_prefix_bar"].dagster_type.is_nothing


def test_built_airbyte_asset_relation_identifier():
def test_built_airbyte_asset_table_name():
destination_tables = ["foo", "bar"]

ab_assets = build_airbyte_assets(
Expand All @@ -322,7 +322,7 @@ def test_built_airbyte_asset_relation_identifier():
# Check relation identifier metadata is added correctly to asset def
assets_def = ab_assets[0]
for metadata in assets_def.metadata_by_key.values():
assert metadata.get("dagster/relation_identifier") is None
assert metadata.get("dagster/table_name") is None

ab_assets = build_airbyte_assets(
"12345",
Expand All @@ -331,15 +331,15 @@ def test_built_airbyte_asset_relation_identifier():
destination_schema="test_schema",
)

relation_identifiers = {"test_database.test_schema.foo", "test_database.test_schema.bar"}
table_names = {"test_database.test_schema.foo", "test_database.test_schema.bar"}

# Check relation identifier metadata is added correctly to asset def
assets_def = ab_assets[0]
for key, metadata in assets_def.metadata_by_key.items():
# Extract the table name from the asset key
table_name = key.path[-1]
assert metadata["dagster/relation_identifier"] in relation_identifiers
assert table_name in metadata["dagster/relation_identifier"]
assert metadata["dagster/table_name"] in table_names
assert table_name in metadata["dagster/table_name"]

ab_assets = build_airbyte_assets(
"12345",
Expand All @@ -349,12 +349,12 @@ def test_built_airbyte_asset_relation_identifier():
normalization_tables={"foo": {"baz"}},
)

relation_identifiers.add("test_database.test_schema.foo.baz")
table_names.add("test_database.test_schema.foo.baz")

# Check relation identifier metadata is added correctly to asset def
assets_def = ab_assets[0]
for key, metadata in assets_def.metadata_by_key.items():
# Extract the table name from the asset key
table_name = key.path[-1]
assert metadata["dagster/relation_identifier"] in relation_identifiers
assert table_name in metadata["dagster/relation_identifier"]
assert metadata["dagster/table_name"] in table_names
assert table_name in metadata["dagster/table_name"]
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def downstream_asset(dagster_tags):
for key, metadata in assets_def.metadata_by_key.items()
)

relation_identifiers = {
table_names = {
"test_database.test_schema.releases",
"test_database.test_schema.tags",
"test_database.test_schema.teams",
Expand Down Expand Up @@ -262,10 +262,10 @@ def downstream_asset(dagster_tags):
.replace("_test", "")
.split("_")[-1]
)
assert metadata["dagster/relation_identifier"] in relation_identifiers
assert table_name in metadata["dagster/relation_identifier"]
assert metadata["dagster/table_name"] in table_names
assert table_name in metadata["dagster/table_name"]
else:
assert not metadata.get("dagster/relation_identifier")
assert not metadata.get("dagster/table_name")

assert assets_def.keys == {AssetKey(t) for t in tables}
assert all(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def test_load_from_project(
# Check metadata is added correctly to asset def
assets_def = ab_assets[0]

relation_identifiers = {
table_names = {
"AIRBYTE.BEN_DEMO.releases",
"AIRBYTE.BEN_DEMO.tags",
"AIRBYTE.BEN_DEMO.teams",
Expand All @@ -133,8 +133,8 @@ def test_load_from_project(
.replace("_test", "")
.split("_")[-1]
)
assert metadata["dagster/relation_identifier"] in relation_identifiers
assert table_name in metadata["dagster/relation_identifier"]
assert metadata["dagster/table_name"] in table_names
assert table_name in metadata["dagster/table_name"]

assert assets_def.keys == {AssetKey(t) for t in tables}
assert all(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ def default_metadata_from_dbt_resource_props(
**DbtMetadataSet(materialization_type=materialization_type),
**TableMetadataSet(
column_schema=column_schema,
relation_identifier=relation_name,
table_name=relation_name,
),
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,20 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
}

assert all(
storage_address_meta.relation_identifier
for storage_address_meta in storage_address_metas.values()
storage_address_meta.table_name for storage_address_meta in storage_address_metas.values()
)
jaffle_shop_duckdb_dbfile_name = os.getenv("DAGSTER_DBT_PYTEST_XDIST_DUCKDB_DBFILE_NAME")
# spot check a few storage addresses
assert (
storage_address_metas["customers"].relation_identifier
storage_address_metas["customers"].table_name
== f"{jaffle_shop_duckdb_dbfile_name}.dev.customers"
)
assert (
storage_address_metas["raw_customers"].relation_identifier
storage_address_metas["raw_customers"].table_name
== f"{jaffle_shop_duckdb_dbfile_name}.dev.raw_customers"
)
assert (
storage_address_metas["stg_orders"].relation_identifier
storage_address_metas["stg_orders"].table_name
== f"{jaffle_shop_duckdb_dbfile_name}.dev.stg_orders"
)

Expand All @@ -76,18 +75,17 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
}

assert all(
storage_address_meta.relation_identifier
for storage_address_meta in storage_address_metas.values()
storage_address_meta.table_name for storage_address_meta in storage_address_metas.values()
)

jaffle_shop_duckdb_dbfile_name = os.getenv("DAGSTER_DBT_PYTEST_XDIST_DUCKDB_DBFILE_NAME")
# test that we can have tables with dots in their names, from
# user-defined aliases
assert (
storage_address_metas["customers"].relation_identifier
storage_address_metas["customers"].table_name
== f"{jaffle_shop_duckdb_dbfile_name}.main.dagster.customers"
)
assert (
storage_address_metas["orders"].relation_identifier
storage_address_metas["orders"].table_name
== f"{jaffle_shop_duckdb_dbfile_name}.main.dagster.orders"
)
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def my_pandas_df() -> pd.DataFrame:
assert len(mats) == 1
mat = mats[0]

assert mat.materialization.metadata["dagster/relation_identifier"] == MetadataValue.text(
assert mat.materialization.metadata["dagster/table_name"] == MetadataValue.text(
f"{db_file}.custom_schema.my_pandas_df"
)

Expand Down
Loading

0 comments on commit fe4ac9f

Please sign in to comment.