Skip to content

Commit

Permalink
Merge pull request #194 from dbt-labs/exposures
Browse files Browse the repository at this point in the history
update refs in exposure and semantic model definitions when downstream of a split resource
  • Loading branch information
dave-connors-3 authored Mar 13, 2024
2 parents dcb4d97 + ec95adf commit 55eebc1
Show file tree
Hide file tree
Showing 11 changed files with 442 additions and 144 deletions.
6 changes: 5 additions & 1 deletion dbt_meshify/utilities/grouper.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ def _generate_resource_group(

logger.info(f"Selected {len(nodes)} resources: {nodes}")
# Check if any of the selected nodes are already in a group of a different name. If so, raise an exception.
nodes = set(filter(lambda x: not x.startswith("source"), nodes))
nodes = set(
filter(
lambda x: not x.startswith("source") and not x.startswith("semantic_model"), nodes
)
)
for node in nodes:
existing_group = self.project.manifest.nodes[node].config.group

Expand Down
109 changes: 76 additions & 33 deletions dbt_meshify/utilities/references.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import re
from pathlib import Path
from typing import List, Optional, Union
from typing import Any, Dict, List, Optional, Union

from dbt.contracts.graph.nodes import CompiledNode
from dbt.contracts.graph.nodes import CompiledNode, Exposure, Resource, SemanticModel
from loguru import logger

from dbt_meshify.change import ChangeSet, EntityType, FileChange, Operation
from dbt_meshify.change import (
ChangeSet,
EntityType,
FileChange,
Operation,
ResourceChange,
)
from dbt_meshify.dbt_projects import DbtProject, DbtSubProject, PathedProject


Expand Down Expand Up @@ -135,32 +141,75 @@ def replace_source_with_ref__python(

return new_code

def update_yml_resource_references(
self,
project_name: str,
upstream_resource_name: str,
resource: Union[Exposure, SemanticModel],
) -> Dict[str, Any]:
new_ref = f"ref('{project_name}', '{upstream_resource_name}')"
if isinstance(resource, SemanticModel):
# we can return early, since semantic models only have one ref and no depends_on
return {"model": new_ref}
refs = resource.refs
ref_to_update = f"ref('{upstream_resource_name}')"
str_refs = []
for ref in refs:
package_clause = f"'{ref.package}', " if ref.package else ""
name_clause = f"'{ref.name}'"
version_clause = f", v={ref.version}" if ref.version else ""
str_ref = f"ref({package_clause}{name_clause}{version_clause})"
if str_ref != ref_to_update:
str_refs.append(str_ref)
str_refs.append(new_ref)
return {"depends_on": str_refs}

def generate_reference_update(
self,
project_name: str,
upstream_node: CompiledNode,
downstream_node: CompiledNode,
downstream_node: Union[Resource, CompiledNode],
downstream_project: PathedProject,
code: str,
) -> FileChange:
) -> Union[FileChange, ResourceChange]:
"""Generate FileChanges that update the references in the downstream_node's code."""

updated_code = self.ref_update_methods[downstream_node.language](
model_name=upstream_node.name,
project_name=project_name,
model_code=code,
)
change: Union[FileChange, ResourceChange]
if isinstance(downstream_node, CompiledNode):
updated_code = self.ref_update_methods[downstream_node.language](
model_name=upstream_node.name,
project_name=project_name,
model_code=code,
)

return FileChange(
operation=Operation.Update,
entity_type=EntityType.Code,
identifier=downstream_node.name,
path=downstream_project.resolve_file_path(downstream_node),
data=updated_code,
)
return FileChange(
operation=Operation.Update,
entity_type=EntityType.Code,
identifier=downstream_node.name,
path=downstream_project.resolve_file_path(downstream_node),
data=updated_code,
)

elif isinstance(downstream_node, Exposure) or isinstance(downstream_node, SemanticModel):
is_exposure = isinstance(downstream_node, Exposure)
data = self.update_yml_resource_references(
project_name=project_name,
upstream_resource_name=upstream_node.name,
resource=downstream_node,
)
return ResourceChange(
operation=Operation.Update,
entity_type=EntityType.Exposure if is_exposure else EntityType.SemanticModel,
identifier=downstream_node.name,
path=downstream_project.resolve_file_path(downstream_node),
data=data,
)
raise Exception("Invalid node type provided to generate_reference_update.")

def update_child_refs(
self, resource: CompiledNode, current_change_set: Optional[ChangeSet] = None
self,
resource: CompiledNode,
current_change_set: Optional[ChangeSet] = None,
) -> ChangeSet:
"""Generate a set of FileChanges to update child references"""

Expand All @@ -175,33 +224,29 @@ def update_child_refs(
else:
compare_project = self.project.name

for model in self.project.child_map[resource.unique_id]:
if model in self.project.resources or model.split(".")[1] != compare_project:
continue
model_node = self.project.get_manifest_node(model)
if not model_node:
raise KeyError(f"Resource {model} not found in manifest")

# Don't process Resources missing a language attribute
if not hasattr(model_node, "language") or not isinstance(model_node, CompiledNode):
for child in self.project.child_map[resource.unique_id]:
if child in self.project.resources or child.split(".")[1] != compare_project:
continue
node = self.project.get_manifest_node(child)
if not node:
raise KeyError(f"Resource {child} not found in manifest")

if current_change_set:
previous_change = get_latest_file_change(
changeset=current_change_set,
identifier=model_node.name,
path=self.project.parent_project.resolve_file_path(model_node),
identifier=node.name,
path=self.project.parent_project.resolve_file_path(node),
)
code = (
previous_change.data
if (previous_change and previous_change.data)
else model_node.raw_code
else getattr(node, "raw_code", "")
)

change = self.generate_reference_update(
project_name=self.project.name,
upstream_node=resource,
downstream_node=model_node,
downstream_node=node,
code=code,
downstream_project=self.project.parent_project,
)
Expand Down Expand Up @@ -258,8 +303,6 @@ def update_parent_refs(self, resource: CompiledNode) -> ChangeSet:
if change.data is None:
raise Exception(f"Change has null data despite being provided code. {change}")

code = change.data

change_set.add(change)

return change_set
Expand Down
Loading

0 comments on commit 55eebc1

Please sign in to comment.