Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-10785] Microbatch models should respect full_refresh model config #10788

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240926-153210.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Ensure microbatch models respect `full_refresh` model config
time: 2024-09-26T15:32:10.202789-05:00
custom:
Author: QMalcolm
Issue: "10785"
11 changes: 8 additions & 3 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,12 +550,17 @@ def _is_incremental(self, model) -> bool:
relation = self.adapter.get_relation(
relation_info.database, relation_info.schema, relation_info.name
)
return (
if (
relation is not None
and relation.type == "table"
and model.config.materialized == "incremental"
and not (getattr(self.config.args, "FULL_REFRESH", False) or model.config.full_refresh)
)
):
if model.config.full_refresh is not None:
return not model.config.full_refresh
else:
return not getattr(self.config.args, "FULL_REFRESH", False)
else:
return False


class RunTask(CompileTask):
Expand Down
41 changes: 41 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@
SELECT * FROM {{ ref('microbatch_model') }}
"""

microbatch_model_full_refresh_false_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), full_refresh=False) }}
select * from {{ ref('input_model') }}
"""


class BaseMicrobatchCustomUserStrategy:
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -578,3 +583,39 @@ def test_run_with_event_time(self, project):
"microbatch_model",
"microbatch_model_2020-01-03.sql",
)


class TestMicrobatchFullRefreshConfigFalse(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_full_refresh_false_sql,
"downstream_model.sql": downstream_model_of_microbatch_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run all partitions from 2020-01-02 to spoofed "now" - 2 expected rows in output
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-02"])
self.assert_row_count(project, "microbatch_model", 2)

# re-running shouldn't change what it's in the data set because there is nothing new
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 2)

# running with --full-refresh shouldn't pick up 2020-01-01 BECAUSE the model has
# full_refresh = false
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--full-refresh"])
self.assert_row_count(project, "microbatch_model", 2)

# update the microbatch model to no longer have full_refresh=False config
write_file(microbatch_model_sql, project.project_root, "models", "microbatch_model.sql")

# running with full refresh should now pick up the 2020-01-01 data
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--full-refresh"])
self.assert_row_count(project, "microbatch_model", 3)
65 changes: 65 additions & 0 deletions tests/unit/task/test_run.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import threading
from argparse import Namespace
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
from unittest.mock import MagicMock, patch

import pytest
from pytest_mock import MockerFixture

from dbt.adapters.postgres import PostgresAdapter
from dbt.artifacts.resources.v1.model import ModelConfig
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.artifacts.schemas.results import RunStatus
from dbt.artifacts.schemas.run import RunResult
Expand Down Expand Up @@ -174,3 +178,64 @@ def test__build_run_microbatch_model_result(
assert expect_success.status == RunStatus.Success
assert expect_error.status == RunStatus.Error
assert expect_partial_success.status == RunStatus.PartialSuccess

@pytest.mark.parametrize(
"has_relation,relation_type,materialized,full_refresh_config,full_refresh_flag,expectation",
[
(False, "table", "incremental", None, False, False),
(True, "other", "incremental", None, False, False),
(True, "table", "other", None, False, False),
# model config takes precendence
(True, "table", "incremental", True, False, False),
# model config takes precendence
(True, "table", "incremental", True, True, False),
# model config takes precendence
(True, "table", "incremental", False, False, True),
# model config takes precendence
(True, "table", "incremental", False, True, True),
# model config is none, so opposite flag value
(True, "table", "incremental", None, True, False),
# model config is none, so opposite flag value
(True, "table", "incremental", None, False, True),
],
)
def test__is_incremental(
self,
mocker: MockerFixture,
model_runner: ModelRunner,
has_relation: bool,
relation_type: str,
materialized: str,
full_refresh_config: Optional[bool],
full_refresh_flag: bool,
expectation: bool,
) -> None:

# Setup adapter relation getting
@dataclass
class RelationInfo:
database: str = "database"
schema: str = "schema"
name: str = "name"

@dataclass
class Relation:
type: str

model_runner.adapter = mocker.Mock()
model_runner.adapter.Relation.create_from.return_value = RelationInfo()

if has_relation:
model_runner.adapter.get_relation.return_value = Relation(type=relation_type)
else:
model_runner.adapter.get_relation.return_value = None

# Set ModelRunner configs
model_runner.config.args = Namespace(FULL_REFRESH=full_refresh_flag)

# Create model with configs
model = model_runner.node
model.config = ModelConfig(materialized=materialized, full_refresh=full_refresh_config)

# Assert result of _is_incremental
assert model_runner._is_incremental(model) == expectation
Loading