From 0d7611a3a0c813d9f640ef77d390cd804cb7a243 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 30 Sep 2024 15:06:18 -0500 Subject: [PATCH 1/5] When retrying microbatch models, propagate prior successful state --- core/dbt/contracts/graph/nodes.py | 4 ++-- core/dbt/task/retry.py | 12 +++++++++--- core/dbt/task/run.py | 20 ++++++++++++++------ 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index 2cdd25506a6..d5ef3d51174 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -60,7 +60,7 @@ from dbt.artifacts.resources import SqlOperation as SqlOperationResource from dbt.artifacts.resources import TimeSpine from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource -from dbt.artifacts.schemas.batch_results import BatchType +from dbt.artifacts.schemas.batch_results import BatchResults from dbt.contracts.graph.model_config import UnitTestNodeConfig from dbt.contracts.graph.node_args import ModelNodeArgs from dbt.contracts.graph.unparsed import ( @@ -454,7 +454,7 @@ def resource_class(cls) -> Type[HookNodeResource]: @dataclass class ModelNode(ModelResource, CompiledNode): - batches: Optional[List[BatchType]] = None + batch_info: Optional[BatchResults] = None @classmethod def resource_class(cls) -> Type[ModelResource]: diff --git a/core/dbt/task/retry.py b/core/dbt/task/retry.py index 4f08804d191..9b3c3874718 100644 --- a/core/dbt/task/retry.py +++ b/core/dbt/task/retry.py @@ -131,11 +131,17 @@ def run(self): ) } + # We need this so that re-running of a microbatch model will only rerun + # batches that previously failed. Note _explicitly_ do no pass the + # batch info if there were _no_ successful batches previously. This is + # because passing the batch info _forces_ the microbatch process into + # _incremental_ model, and it may be that we need to be in full refresh + # mode which is only handled if batch_info _isn't_ passed for a node batch_map = { - result.unique_id: result.batch_results.failed + result.unique_id: result.batch_results for result in self.previous_results.results - if result.status == NodeStatus.PartialSuccess - and result.batch_results is not None + if result.batch_results is not None + and len(result.batch_results.successful) != 0 and len(result.batch_results.failed) > 0 and not ( self.previous_command_name != "run-operation" diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 673a400ec02..9cbbb03a5bc 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -1,6 +1,7 @@ import functools import os import threading +from copy import deepcopy from datetime import datetime from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type @@ -327,6 +328,13 @@ def _build_run_microbatch_model_result( status = RunStatus.PartialSuccess msg = f"PARTIAL SUCCESS ({num_successes}/{num_successes + num_failures})" + if model.batch_info is not None: + new_batch_results = deepcopy(model.batch_info) + new_batch_results.failed = [] + new_batch_results = new_batch_results + batch_results + else: + new_batch_results = batch_results + return RunResult( node=model, status=status, @@ -337,7 +345,7 @@ def _build_run_microbatch_model_result( message=msg, adapter_response={}, failures=num_failures, - batch_results=batch_results, + batch_results=new_batch_results, ) def _build_succesful_run_batch_result( @@ -470,7 +478,7 @@ def _execute_microbatch_materialization( ) -> List[RunResult]: batch_results: List[RunResult] = [] - if model.batches is None: + if model.batch_info is None: microbatch_builder = MicrobatchBuilder( model=model, is_incremental=self._is_incremental(model), @@ -481,8 +489,8 @@ def _execute_microbatch_materialization( start = microbatch_builder.build_start_time(end) batches = microbatch_builder.build_batches(start, end) else: - batches = model.batches - # if there are batches, then don't run as full_refresh and do force is_incremental + batches = model.batch_info.failed + # if there is batch info, then don't run as full_refresh and do force is_incremental # not doing this risks blowing away the work that has already been done if self._has_relation(model=model): context["is_incremental"] = lambda: True @@ -567,7 +575,7 @@ def __init__( args: Flags, config: RuntimeConfig, manifest: Manifest, - batch_map: Optional[Dict[str, List[BatchType]]] = None, + batch_map: Optional[Dict[str, BatchResults]] = None, ) -> None: super().__init__(args, config, manifest) self.batch_map = batch_map @@ -709,7 +717,7 @@ def populate_microbatch_batches(self, selected_uids: AbstractSet[str]): if uid in self.batch_map: node = self.manifest.ref_lookup.perform_lookup(uid, self.manifest) if isinstance(node, ModelNode): - node.batches = self.batch_map[uid] + node.batch_info = self.batch_map[uid] def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: with adapter.connection_named("master"): From b4b7dbd41d84c8c463c64443a6438e0a2de1199a Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 30 Sep 2024 15:32:25 -0500 Subject: [PATCH 2/5] Changie doc for microbatch dbt retry fixes --- .changes/unreleased/Fixes-20240930-153158.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Fixes-20240930-153158.yaml diff --git a/.changes/unreleased/Fixes-20240930-153158.yaml b/.changes/unreleased/Fixes-20240930-153158.yaml new file mode 100644 index 00000000000..33970504367 --- /dev/null +++ b/.changes/unreleased/Fixes-20240930-153158.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Ensure dbt retry of microbatch models doesn't lose prior successful state +time: 2024-09-30T15:31:58.541656-05:00 +custom: + Author: QMalcolm + Issue: "10800" From 0100a58df437a9d10942c41202d76f97e6adfae9 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 30 Sep 2024 15:44:08 -0500 Subject: [PATCH 3/5] Fix test_manifest unit tests for batch_info key --- tests/unit/contracts/graph/test_manifest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/contracts/graph/test_manifest.py b/tests/unit/contracts/graph/test_manifest.py index eaff7f6c57e..68676e2298e 100644 --- a/tests/unit/contracts/graph/test_manifest.py +++ b/tests/unit/contracts/graph/test_manifest.py @@ -96,7 +96,7 @@ "deprecation_date", "defer_relation", "time_spine", - "batches", + "batch_info", "vars", } ) From 21e7919ff8a6c3abd4f5939896dac5b50e50e9e6 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 30 Sep 2024 15:51:51 -0500 Subject: [PATCH 4/5] Add functional test for when a microbatch model has multiple retries --- .../functional/microbatch/test_microbatch.py | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index c233657f180..c867acfac66 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -485,6 +485,44 @@ def test_run_with_event_time(self, project): self.assert_row_count(project, "microbatch_model", 3) +class TestMicrobatchMultipleRetries(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_failing_incremental_partition_sql, + } + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # run all partitions from start - 2 expected rows in output, one failed + with patch_microbatch_end_time("2020-01-03 13:57:00"): + _, console_output = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"]) + + assert "PARTIAL SUCCESS (2/3)" in console_output + assert "Completed with 1 partial success" in console_output + + self.assert_row_count(project, "microbatch_model", 2) + + with patch_microbatch_end_time("2020-01-03 13:57:00"): + _, console_output = run_dbt_and_capture(["retry"], expect_pass=False) + + assert "PARTIAL SUCCESS" not in console_output + assert "ERROR" in console_output + assert "Completed with 1 error, 0 partial successs, and 0 warnings" in console_output + + self.assert_row_count(project, "microbatch_model", 2) + + with patch_microbatch_end_time("2020-01-03 13:57:00"): + _, console_output = run_dbt_and_capture(["retry"], expect_pass=False) + + assert "PARTIAL SUCCESS" not in console_output + assert "ERROR" in console_output + assert "Completed with 1 error, 0 partial successs, and 0 warnings" in console_output + + self.assert_row_count(project, "microbatch_model", 2) + + microbatch_model_first_partition_failing_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} {% if '2020-01-01' in (model.config.__dbt_internal_microbatch_event_time_start | string) %} From 4d21ecc4730c5dcc8131104ba55296cf93e7cb23 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 30 Sep 2024 23:54:01 -0500 Subject: [PATCH 5/5] Add comment about when batch_info will be something other than None --- core/dbt/task/run.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 9cbbb03a5bc..93a8e261b50 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -478,6 +478,9 @@ def _execute_microbatch_materialization( ) -> List[RunResult]: batch_results: List[RunResult] = [] + # Note currently (9/30/2024) model.batch_info is only ever _not_ `None` + # IFF `dbt retry` is being run and the microbatch model had batches which + # failed on the run of the model (which is being retried) if model.batch_info is None: microbatch_builder = MicrobatchBuilder( model=model,