Skip to content

Commit

Permalink
Merge branch 'main' into fix_9005_2
Browse files Browse the repository at this point in the history
  • Loading branch information
aranke authored Sep 30, 2024
2 parents 0bf7097 + 2ff3f20 commit 9b8685a
Show file tree
Hide file tree
Showing 17 changed files with 467 additions and 308 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240918-162959.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Create 'skip_nodes_if_on_run_start_fails' behavior change flag
time: 2024-09-18T16:29:59.268422+01:00
custom:
Author: aranke
Issue: "7387"
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,7 @@ class ParsedSingularTestPatch(ParsedPatch):
ResultNode = Union[
ManifestNode,
SourceDefinition,
HookNode,
]

# All nodes that can be in the DAG
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ class ProjectFlags(ExtensibleDbtClassMixin):
require_explicit_package_overrides_for_builtin_materializations: bool = True
require_resource_names_without_spaces: bool = False
source_freshness_run_project_hooks: bool = False
skip_nodes_if_on_run_start_fails: bool = False
state_modified_compare_more_unrendered_values: bool = False

@property
Expand All @@ -349,6 +350,7 @@ def project_only_flags(self) -> Dict[str, Any]:
"require_explicit_package_overrides_for_builtin_materializations": self.require_explicit_package_overrides_for_builtin_materializations,
"require_resource_names_without_spaces": self.require_resource_names_without_spaces,
"source_freshness_run_project_hooks": self.source_freshness_run_project_hooks,
"skip_nodes_if_on_run_start_fails": self.skip_nodes_if_on_run_start_fails,
"state_modified_compare_more_unrendered_values": self.state_modified_compare_more_unrendered_values,
}

Expand Down
14 changes: 12 additions & 2 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1516,10 +1516,20 @@ def code(self) -> str:
return "Q033"

def message(self) -> str:
msg = f"OK hook: {self.statement}"
if self.status == "success":
info = "OK"
status = green(info)
elif self.status == "skipped":
info = "SKIP"
status = yellow(info)
else:
info = "ERROR"
status = red(info)
msg = f"{info} hook: {self.statement}"

return format_fancy_output_line(
msg=msg,
status=green(self.status),
status=status,
index=self.index,
total=self.total,
execution_time=self.execution_time,
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/task/clone.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import threading
from typing import AbstractSet, Any, Collection, Iterable, List, Optional, Set, Type

from dbt.adapters.base import BaseRelation
from dbt.adapters.base import BaseAdapter, BaseRelation
from dbt.artifacts.resources.types import NodeType
from dbt.artifacts.schemas.run import RunResult, RunStatus
from dbt.clients.jinja import MacroGenerator
Expand Down Expand Up @@ -125,14 +125,15 @@ def get_model_schemas(self, adapter, selected_uids: Iterable[str]) -> Set[BaseRe

return result

def before_run(self, adapter, selected_uids: AbstractSet[str]):
def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus:
with adapter.connection_named("master"):
self.defer_to_manifest()
# only create target schemas, but also cache defer_relation schemas
schemas_to_create = super().get_model_schemas(adapter, selected_uids)
self.create_schemas(adapter, schemas_to_create)
schemas_to_cache = self.get_model_schemas(adapter, selected_uids)
self.populate_adapter_cache(adapter, schemas_to_cache)
return RunStatus.Success

@property
def resource_types(self) -> List[NodeType]:
Expand Down
30 changes: 25 additions & 5 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import AbstractSet, Dict, List, Optional, Type

from dbt import deprecations
from dbt.adapters.base import BaseAdapter
from dbt.adapters.base.impl import FreshnessResponse
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.capability import Capability
Expand Down Expand Up @@ -204,10 +205,25 @@ def get_node_selector(self):
resource_types=[NodeType.Source],
)

def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None:
super().before_run(adapter, selected_uids)
if adapter.supports(Capability.TableLastModifiedMetadataBatch):
self.populate_metadata_freshness_cache(adapter, selected_uids)
def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus:
populate_metadata_freshness_cache_status = RunStatus.Success

before_run_status = super().before_run(adapter, selected_uids)

if before_run_status == RunStatus.Success and adapter.supports(
Capability.TableLastModifiedMetadataBatch
):
populate_metadata_freshness_cache_status = self.populate_metadata_freshness_cache(
adapter, selected_uids
)

if (
before_run_status == RunStatus.Success
and populate_metadata_freshness_cache_status == RunStatus.Success
):
return RunStatus.Success
else:
return RunStatus.Error

def get_runner(self, node) -> BaseRunner:
freshness_runner = super().get_runner(node)
Expand Down Expand Up @@ -243,7 +259,9 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]:
deprecations.warn("source-freshness-project-hooks")
return []

def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[str]) -> None:
def populate_metadata_freshness_cache(
self, adapter, selected_uids: AbstractSet[str]
) -> RunStatus:
if self.manifest is None:
raise DbtInternalError("Manifest must be set to populate metadata freshness cache")

Expand All @@ -266,6 +284,7 @@ def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[
batch_metadata_sources
)
self._metadata_freshness_cache.update(metadata_freshness_results)
return RunStatus.Success
except Exception as e:
# This error handling is intentionally very coarse.
# If anything goes wrong during batch metadata calculation, we can safely
Expand All @@ -276,6 +295,7 @@ def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[
Note(msg=f"Metadata freshness could not be computed in batch: {e}"),
EventLevel.WARN,
)
return RunStatus.Error

def get_freshness_metadata_cache(self) -> Dict[BaseRelation, FreshnessResult]:
return self._metadata_freshness_cache
3 changes: 2 additions & 1 deletion core/dbt/task/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def get_counts(flat_nodes) -> str:

counts[t] = counts.get(t, 0) + 1

stat_line = ", ".join([pluralize(v, k).replace("_", " ") for k, v in counts.items()])
sorted_items = sorted(counts.items(), key=lambda x: x[0])
stat_line = ", ".join([pluralize(v, k).replace("_", " ") for k, v in sorted_items])

return stat_line

Expand Down
20 changes: 13 additions & 7 deletions core/dbt/task/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,26 @@ def __init__(self, args: Flags, config: RuntimeConfig) -> None:
self.task_class = TASK_DICT.get(self.previous_command_name) # type: ignore

def run(self):
unique_ids = set(
[
result.unique_id
for result in self.previous_results.results
if result.status in RETRYABLE_STATUSES
]
)
unique_ids = {
result.unique_id
for result in self.previous_results.results
if result.status in RETRYABLE_STATUSES
and not (
self.previous_command_name != "run-operation"
and result.unique_id.startswith("operation.")
)
}

batch_map = {
result.unique_id: result.batch_results.failed
for result in self.previous_results.results
if result.status == NodeStatus.PartialSuccess
and result.batch_results is not None
and len(result.batch_results.failed) > 0
and not (
self.previous_command_name != "run-operation"
and result.unique_id.startswith("operation.")
)
}

class TaskWrapper(self.task_class):
Expand Down
Loading

0 comments on commit 9b8685a

Please sign in to comment.