Skip to content

Commit

Permalink
[CORE-388] Add group metadata info to LogModelResult and `LogTestRe…
Browse files Browse the repository at this point in the history
…sult` (#10775)
  • Loading branch information
aranke authored Sep 26, 2024
1 parent 1fe9c1b commit 1076352
Show file tree
Hide file tree
Showing 13 changed files with 433 additions and 351 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240926-143448.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add group metadata info to LogModelResult and LogTestResult
time: 2024-09-26T14:34:48.334703+01:00
custom:
Author: aranke
Issue: "10775"
2 changes: 1 addition & 1 deletion core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,7 @@ def to_logging_dict(self) -> Dict[str, Union[str, Dict[str, str]]]:
return {
"name": self.name,
"package_name": self.package_name,
"owner": self.owner.to_dict(),
"owner": self.owner.to_dict(omit_none=True),
}


Expand Down
17 changes: 10 additions & 7 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,12 @@ message SQLRunnerExceptionMsg {
SQLRunnerException data = 2;
}

message Group {
string name = 1;
string package_name = 3;
map<string, string> owner = 7;
}

// Q007
message LogTestResult {
NodeInfo node_info = 1;
Expand All @@ -1280,6 +1286,8 @@ message LogTestResult {
int32 num_models = 5;
float execution_time = 6;
int32 num_failures = 7;
Group group = 8;
string attached_node = 9;
}

message LogTestResultMsg {
Expand Down Expand Up @@ -1312,6 +1320,7 @@ message LogModelResult {
int32 index = 4;
int32 total = 5;
float execution_time = 6;
Group group = 7;
}

message LogModelResultMsg {
Expand Down Expand Up @@ -1373,7 +1382,7 @@ message LogFreshnessResultMsg {
LogFreshnessResult data = 2;
}

// Q018
// Q019
message LogNodeNoOpResult {
NodeInfo node_info = 1;
string description = 2;
Expand Down Expand Up @@ -1820,12 +1829,6 @@ message ServingDocsExitInfoMsg {
ServingDocsExitInfo data = 2;
}

message Group {
string name = 1;
string package_name = 3;
map<string, string> owner = 7;
}

// Z021
message RunResultWarning {
string resource_type = 1;
Expand Down
574 changes: 287 additions & 287 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions core/dbt/task/group_lookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import AbstractSet, Dict, Optional, Union

from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import Group

_node_id_to_group_name_map: Dict[str, str] = {}
_group_name_to_group_map: Dict[str, Group] = {}


def init(manifest: Optional[Manifest], selected_ids: AbstractSet[str]) -> None:
if not manifest:
return

_every_group_name_to_group_map = {v.name: v for v in manifest.groups.values()}

for group_name, node_ids in manifest.group_map.items():
for node_id in node_ids:
# only add node to lookup if it's selected
if node_id in selected_ids:
_node_id_to_group_name_map[node_id] = group_name

# only add group to lookup if it's not already there and if node is selected
if group_name not in _group_name_to_group_map:
_group_name_to_group_map[group_name] = _every_group_name_to_group_map[
group_name
]


def get(node_id: str) -> Optional[Dict[str, Union[str, Dict[str, str]]]]:
group_name = _node_id_to_group_name_map.get(node_id)

if group_name is None:
return None

group = _group_name_to_group_map.get(group_name)

if group is None:
return None

return group.to_logging_dict()
31 changes: 13 additions & 18 deletions core/dbt/task/printer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Dict, Optional
from typing import Dict, Optional, Union

from dbt.artifacts.schemas.results import NodeStatus
from dbt.contracts.graph.nodes import Group
from dbt.events.types import (
CheckNodeTestFailure,
EndOfRunSummary,
Expand All @@ -14,6 +13,7 @@
StatsLine,
)
from dbt.node_types import NodeType
from dbt.task import group_lookup
from dbt_common.events.base_types import EventLevel
from dbt_common.events.format import pluralize
from dbt_common.events.functions import fire_event
Expand Down Expand Up @@ -70,7 +70,10 @@ def print_run_status_line(results) -> None:


def print_run_result_error(
result, newline: bool = True, is_warning: bool = False, group: Optional[Group] = None
result,
newline: bool = True,
is_warning: bool = False,
group: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
) -> None:
# set node_info for logging events
node_info = None
Expand All @@ -80,36 +83,31 @@ def print_run_result_error(
if newline:
fire_event(Formatting(""))
if is_warning:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultWarning(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
group=group,
)
)
else:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultFailure(
resource_type=result.node.resource_type,
node_name=result.node.name,
path=result.node.original_file_path,
node_info=node_info,
group=group_dict,
group=group,
)
)

if result.message:
if is_warning:
fire_event(RunResultWarningMessage(msg=result.message, node_info=node_info))
else:
group_dict = group.to_logging_dict() if group else None
fire_event(
RunResultError(msg=result.message, node_info=node_info, group=group_dict)
)
fire_event(RunResultError(msg=result.message, node_info=node_info, group=group))
else:
fire_event(RunResultErrorNoMessage(status=result.status, node_info=node_info))

Expand All @@ -129,13 +127,10 @@ def print_run_result_error(
elif result.message is not None:
if newline:
fire_event(Formatting(""))
group_dict = group.to_logging_dict() if group else None
fire_event(RunResultError(msg=result.message, node_info=node_info, group=group_dict))
fire_event(RunResultError(msg=result.message, node_info=node_info, group=group))


def print_run_end_messages(
results, keyboard_interrupt: bool = False, groups: Optional[Dict[str, Group]] = None
) -> None:
def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None:
errors, warnings, partial_successes = [], [], []
for r in results:
if r.status in (NodeStatus.RuntimeErr, NodeStatus.Error, NodeStatus.Fail):
Expand All @@ -160,11 +155,11 @@ def print_run_end_messages(
)

for error in errors:
group = groups.get(error.node.unique_id) if groups and hasattr(error, "node") else None
group = group_lookup.get(error.node.unique_id) if hasattr(error, "node") else None
print_run_result_error(error, is_warning=False, group=group)

for warning in warnings:
group = groups.get(warning.node.unique_id) if groups and hasattr(warning, "node") else None
group = group_lookup.get(warning.node.unique_id) if hasattr(warning, "node") else None
print_run_result_error(warning, is_warning=True, group=group)

print_run_status_line(results)
19 changes: 7 additions & 12 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from dbt_common.events.types import Formatting
from dbt_common.exceptions import DbtValidationError

from . import group_lookup
from .compile import CompileRunner, CompileTask
from .printer import get_counts, print_run_end_messages

Expand Down Expand Up @@ -215,6 +216,7 @@ def print_start_line(self):

def print_result_line(self, result):
description = self.describe_node()
group = group_lookup.get(self.node.unique_id)
if result.status == NodeStatus.Error:
status = result.status
level = EventLevel.ERROR
Expand All @@ -229,6 +231,7 @@ def print_result_line(self, result):
total=self.num_nodes,
execution_time=result.execution_time,
node_info=self.node.node_info,
group=group,
),
level=level,
)
Expand All @@ -242,6 +245,7 @@ def print_batch_result_line(
exception: Optional[Exception],
):
description = self.describe_batch(batch_start)
group = group_lookup.get(self.node.unique_id)
if result.status == NodeStatus.Error:
status = result.status
level = EventLevel.ERROR
Expand All @@ -256,6 +260,7 @@ def print_batch_result_line(
total=batch_total,
execution_time=result.execution_time,
node_info=self.node.node_info,
group=group,
),
level=level,
)
Expand Down Expand Up @@ -716,6 +721,7 @@ def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None:
self.populate_adapter_cache(adapter, required_schemas)
self.populate_microbatch_batches(selected_uids)
self.safe_run_hooks(adapter, RunHookType.Start, {})
group_lookup.init(self.manifest, selected_uids)

def after_run(self, adapter, results) -> None:
# in on-run-end hooks, provide the value 'database_schemas', which is a
Expand Down Expand Up @@ -753,17 +759,6 @@ def get_node_selector(self) -> ResourceTypeSelector:
def get_runner_type(self, _) -> Optional[Type[BaseRunner]]:
return ModelRunner

def get_groups_for_nodes(self, nodes):
node_to_group_name_map = {i: k for k, v in self.manifest.group_map.items() for i in v}
group_name_to_group_map = {v.name: v for v in self.manifest.groups.values()}

return {
node.unique_id: group_name_to_group_map.get(node_to_group_name_map.get(node.unique_id))
for node in nodes
}

def task_end_messages(self, results) -> None:
groups = self.get_groups_for_nodes([r.node for r in results if hasattr(r, "node")])

if results:
print_run_end_messages(results, groups=groups)
print_run_end_messages(results)
12 changes: 9 additions & 3 deletions core/dbt/task/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from dbt_common.exceptions import DbtBaseException, DbtRuntimeError
from dbt_common.ui import green, red

from . import group_lookup
from .compile import CompileRunner
from .run import RunTask

Expand Down Expand Up @@ -93,7 +94,6 @@ class UnitTestResultData(dbtClassMixin):

class TestRunner(CompileRunner):
_ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
_LOG_TEST_RESULT_EVENTS = LogTestResult

def describe_node_name(self) -> str:
if self.node.resource_type == NodeType.Unit:
Expand All @@ -107,16 +107,22 @@ def describe_node(self) -> str:

def print_result_line(self, result):
model = result.node
group = group_lookup.get(model.unique_id)
attached_node = (
result.node.attached_node if isinstance(result.node, GenericTestNode) else None
)

fire_event(
self._LOG_TEST_RESULT_EVENTS(
LogTestResult(
name=self.describe_node_name(),
status=str(result.status),
index=self.node_index,
num_models=self.num_nodes,
execution_time=result.execution_time,
node_info=model.node_info,
num_failures=result.failures,
group=group,
attached_node=attached_node,
),
level=LogTestResult.status_to_level(str(result.status)),
)
Expand Down Expand Up @@ -298,7 +304,7 @@ def build_test_run_result(self, test: TestNode, result: TestResultData) -> RunRe
failures = result.failures
elif result.should_warn:
if get_flags().WARN_ERROR or get_flags().WARN_ERROR_OPTIONS.includes(
self._LOG_TEST_RESULT_EVENTS.__name__
LogTestResult.__name__
):
status = TestStatus.Fail
message = f"Got {num_errors}, configured to fail if {test.config.warn_if}"
Expand Down
7 changes: 7 additions & 0 deletions tests/functional/adapter/basic/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,15 @@

generic_test_view_yml = """
version: 2
groups:
- name: my_group
owner:
name: group_owner
models:
- name: view_model
group: my_group
columns:
- name: id
data_tests:
Expand Down
12 changes: 10 additions & 2 deletions tests/functional/adapter/basic/test_generic_tests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from dbt.tests.util import run_dbt
from dbt.tests.util import run_dbt, run_dbt_and_capture
from tests.functional.adapter.basic.files import (
base_table_sql,
base_view_sql,
Expand Down Expand Up @@ -58,9 +58,17 @@ def test_generic_tests(self, project):
assert len(results) == 2

# test command, all tests
results = run_dbt(["test"])
results, log_output = run_dbt_and_capture(["test", "--log-format", "json"])
assert len(results) == 3

result_log_lines = [
line for line in log_output.split("\n") if "LogTestResult" in line and "group" in line
]
assert len(result_log_lines) == 1
assert "my_group" in result_log_lines[0]
assert "group_owner" in result_log_lines[0]
assert "model.generic_tests.view_model" in result_log_lines[0]


class TestGenericTests(BaseGenericTests):
pass
Loading

0 comments on commit 1076352

Please sign in to comment.