From d18f50bbb8a274c73442d8ae674d1036e736bbf0 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 15 Oct 2024 16:55:19 -0500 Subject: [PATCH] Ensure consistent `current_time` across microbatch models in an invocation (#10830) * Add test that checks microbatch models are all operating with the same `current_time` * Set an `invocated_at` on the `RuntimeConfig` and plumb to `MicrobatchBuilder` * Add changie doc * Rename `invocated_at` to `invoked_at` * Simply conditional logic for setting MicrobatchBuilder.batch_current_time * Rename `batch_current_time` to `default_end_time` for MicrobatchBuilder --- .../unreleased/Features-20241007-115853.yaml | 6 ++++ core/dbt/config/runtime.py | 6 +++- .../incremental/microbatch.py | 4 ++- core/dbt/task/run.py | 1 + .../functional/microbatch/test_microbatch.py | 32 +++++++++++++++++++ 5 files changed, 47 insertions(+), 2 deletions(-) create mode 100644 .changes/unreleased/Features-20241007-115853.yaml diff --git a/.changes/unreleased/Features-20241007-115853.yaml b/.changes/unreleased/Features-20241007-115853.yaml new file mode 100644 index 00000000000..ac2e61c5b59 --- /dev/null +++ b/.changes/unreleased/Features-20241007-115853.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Ensure microbatch models use same `current_time` value +time: 2024-10-07T11:58:53.460941-05:00 +custom: + Author: QMalcolm + Issue: "10819" diff --git a/core/dbt/config/runtime.py b/core/dbt/config/runtime.py index e1c24cf5f0c..d2c0183e1cb 100644 --- a/core/dbt/config/runtime.py +++ b/core/dbt/config/runtime.py @@ -1,7 +1,8 @@ import itertools import os from copy import deepcopy -from dataclasses import dataclass +from dataclasses import dataclass, field +from datetime import datetime from pathlib import Path from typing import ( Any, @@ -15,6 +16,8 @@ Type, ) +import pytz + from dbt import tracking from dbt.adapters.contracts.connection import ( AdapterRequiredConfig, @@ -98,6 +101,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig): profile_name: str cli_vars: Dict[str, Any] dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None + invoked_at: datetime = field(default_factory=lambda: datetime.now(pytz.UTC)) def __post_init__(self): self.validate() diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index 268132ffa0c..da8930acb89 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -18,6 +18,7 @@ def __init__( is_incremental: bool, event_time_start: Optional[datetime], event_time_end: Optional[datetime], + default_end_time: Optional[datetime] = None, ): if model.config.incremental_strategy != "microbatch": raise DbtInternalError( @@ -35,10 +36,11 @@ def __init__( event_time_start.replace(tzinfo=pytz.UTC) if event_time_start else None ) self.event_time_end = event_time_end.replace(tzinfo=pytz.UTC) if event_time_end else None + self.default_end_time = default_end_time or datetime.now(pytz.UTC) def build_end_time(self): """Defaults the end_time to the current time in UTC unless a non `None` event_time_end was provided""" - return self.event_time_end or datetime.now(tz=pytz.utc) + return self.event_time_end or self.default_end_time def build_start_time(self, checkpoint: Optional[datetime]): """Create a start time based off the passed in checkpoint. diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index b1f706d72ec..99913a551c5 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -498,6 +498,7 @@ def _execute_microbatch_materialization( is_incremental=self._is_incremental(model), event_time_start=getattr(self.config.args, "EVENT_TIME_START", None), event_time_end=getattr(self.config.args, "EVENT_TIME_END", None), + default_end_time=self.config.invoked_at, ) end = microbatch_builder.build_end_time() start = microbatch_builder.build_start_time(end) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index f6b49e1405a..71c8588b17f 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -1,7 +1,9 @@ import os +from datetime import datetime from unittest import mock import pytest +import pytz from dbt.events.types import LogModelResult from dbt.tests.util import ( @@ -40,6 +42,11 @@ select * from {{ ref('input_model') }} """ +microbatch_model_downstream_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +select * from {{ ref('microbatch_model') }} +""" + microbatch_model_ref_render_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} @@ -671,3 +678,28 @@ def test_run_with_event_time(self, project): with patch_microbatch_end_time("2020-01-03 13:57:00"): run_dbt(["run", "--full-refresh"]) self.assert_row_count(project, "microbatch_model", 3) + + +class TestMicrbobatchModelsRunWithSameCurrentTime(BaseMicrobatchTest): + + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_sql, + "second_microbatch_model.sql": microbatch_model_downstream_sql, + } + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_microbatch(self, project) -> None: + current_time = datetime.now(pytz.UTC) + run_dbt(["run", "--event-time-start", current_time.strftime("%Y-%m-%d")]) + + run_results = get_artifact(project.project_root, "target", "run_results.json") + microbatch_model_last_batch = run_results["results"][1]["batch_results"]["successful"][-1] + second_microbatch_model_last_batch = run_results["results"][2]["batch_results"][ + "successful" + ][-1] + + # they should have the same last batch because they are using the _same_ "current_time" + assert microbatch_model_last_batch == second_microbatch_model_last_batch