Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When retrying microbatch models, propagate prior successful state #10802

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240930-153158.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Ensure dbt retry of microbatch models doesn't lose prior successful state
time: 2024-09-30T15:31:58.541656-05:00
custom:
Author: QMalcolm
Issue: "10800"
4 changes: 2 additions & 2 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
from dbt.artifacts.resources import SqlOperation as SqlOperationResource
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.artifacts.schemas.batch_results import BatchType
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.contracts.graph.model_config import UnitTestNodeConfig
from dbt.contracts.graph.node_args import ModelNodeArgs
from dbt.contracts.graph.unparsed import (
Expand Down Expand Up @@ -454,7 +454,7 @@ def resource_class(cls) -> Type[HookNodeResource]:

@dataclass
class ModelNode(ModelResource, CompiledNode):
batches: Optional[List[BatchType]] = None
batch_info: Optional[BatchResults] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause a loading error for a previous manifest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has no effect on the written manifest, which is intentional. That is the case because these do not exist on the artifact class for the object


@classmethod
def resource_class(cls) -> Type[ModelResource]:
Expand Down
12 changes: 9 additions & 3 deletions core/dbt/task/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,17 @@ def run(self):
)
}

# We need this so that re-running of a microbatch model will only rerun
# batches that previously failed. Note _explicitly_ do no pass the
# batch info if there were _no_ successful batches previously. This is
# because passing the batch info _forces_ the microbatch process into
# _incremental_ model, and it may be that we need to be in full refresh
# mode which is only handled if batch_info _isn't_ passed for a node
batch_map = {
result.unique_id: result.batch_results.failed
result.unique_id: result.batch_results
for result in self.previous_results.results
if result.status == NodeStatus.PartialSuccess
and result.batch_results is not None
if result.batch_results is not None
and len(result.batch_results.successful) != 0
and len(result.batch_results.failed) > 0
and not (
self.previous_command_name != "run-operation"
Expand Down
20 changes: 14 additions & 6 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import os
import threading
from copy import deepcopy
from datetime import datetime
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type

Expand Down Expand Up @@ -327,6 +328,13 @@
status = RunStatus.PartialSuccess
msg = f"PARTIAL SUCCESS ({num_successes}/{num_successes + num_failures})"

if model.batch_info is not None:
new_batch_results = deepcopy(model.batch_info)
new_batch_results.failed = []
new_batch_results = new_batch_results + batch_results

Check warning on line 334 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L332-L334

Added lines #L332 - L334 were not covered by tests
else:
new_batch_results = batch_results

return RunResult(
node=model,
status=status,
Expand All @@ -337,7 +345,7 @@
message=msg,
adapter_response={},
failures=num_failures,
batch_results=batch_results,
batch_results=new_batch_results,
)

def _build_succesful_run_batch_result(
Expand Down Expand Up @@ -470,7 +478,7 @@
) -> List[RunResult]:
batch_results: List[RunResult] = []

if model.batches is None:
if model.batch_info is None:

Check warning on line 481 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L481

Added line #L481 was not covered by tests
microbatch_builder = MicrobatchBuilder(
model=model,
is_incremental=self._is_incremental(model),
Expand All @@ -481,8 +489,8 @@
start = microbatch_builder.build_start_time(end)
batches = microbatch_builder.build_batches(start, end)
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here mentioning this only happens during retry?

batches = model.batches
# if there are batches, then don't run as full_refresh and do force is_incremental
batches = model.batch_info.failed

Check warning on line 492 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L492

Added line #L492 was not covered by tests
# if there is batch info, then don't run as full_refresh and do force is_incremental
# not doing this risks blowing away the work that has already been done
if self._has_relation(model=model):
context["is_incremental"] = lambda: True
Expand Down Expand Up @@ -567,7 +575,7 @@
args: Flags,
config: RuntimeConfig,
manifest: Manifest,
batch_map: Optional[Dict[str, List[BatchType]]] = None,
batch_map: Optional[Dict[str, BatchResults]] = None,
) -> None:
super().__init__(args, config, manifest)
self.batch_map = batch_map
Expand Down Expand Up @@ -709,7 +717,7 @@
if uid in self.batch_map:
node = self.manifest.ref_lookup.perform_lookup(uid, self.manifest)
if isinstance(node, ModelNode):
node.batches = self.batch_map[uid]
node.batch_info = self.batch_map[uid]

Check warning on line 720 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L720

Added line #L720 was not covered by tests

def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus:
with adapter.connection_named("master"):
Expand Down
38 changes: 38 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,44 @@ def test_run_with_event_time(self, project):
self.assert_row_count(project, "microbatch_model", 3)


class TestMicrobatchMultipleRetries(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_failing_incremental_partition_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, console_output = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"])

assert "PARTIAL SUCCESS (2/3)" in console_output
assert "Completed with 1 partial success" in console_output

self.assert_row_count(project, "microbatch_model", 2)

with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, console_output = run_dbt_and_capture(["retry"], expect_pass=False)

assert "PARTIAL SUCCESS" not in console_output
assert "ERROR" in console_output
assert "Completed with 1 error, 0 partial successs, and 0 warnings" in console_output

self.assert_row_count(project, "microbatch_model", 2)

with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, console_output = run_dbt_and_capture(["retry"], expect_pass=False)

assert "PARTIAL SUCCESS" not in console_output
assert "ERROR" in console_output
assert "Completed with 1 error, 0 partial successs, and 0 warnings" in console_output

self.assert_row_count(project, "microbatch_model", 2)


microbatch_model_first_partition_failing_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{% if '2020-01-01' in (model.config.__dbt_internal_microbatch_event_time_start | string) %}
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/contracts/graph/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
"deprecation_date",
"defer_relation",
"time_spine",
"batches",
"batch_info",
"vars",
}
)
Expand Down
Loading