diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 02c6976c848..9fb1292ff69 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -1,9 +1,7 @@ import os -from datetime import datetime from unittest import mock import pytest -import pytz from dbt.events.types import LogModelResult from dbt.tests.util import ( @@ -42,8 +40,13 @@ select * from {{ ref('input_model') }} """ -microbatch_model_downstream_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +microbatch_yearly_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +select * from {{ ref('input_model') }} +""" + +microbatch_yearly_model_downstream_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} select * from {{ ref('microbatch_model') }} """ @@ -208,15 +211,6 @@ def test_run_with_event_time(self, project): # creation events assert batch_creation_events == 3 - # build model >= 2020-01-02 - with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run", "--event-time-start", "2020-01-02", "--full-refresh"]) - self.assert_row_count(project, "microbatch_model", 2) - - # build model < 2020-01-03 - run_dbt(["run", "--event-time-end", "2020-01-03", "--full-refresh"]) - self.assert_row_count(project, "microbatch_model", 2) - # build model between 2020-01-02 >= event_time < 2020-01-03 run_dbt( [ @@ -416,7 +410,7 @@ def models(self): @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time_logs(self, project): with patch_microbatch_end_time("2020-01-03 13:57:00"): - _, logs = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"]) + _, logs = run_dbt_and_capture(["run"]) assert "start: 2020-01-01 00:00:00+00:00" in logs assert "end: 2020-01-02 00:00:00+00:00" in logs @@ -450,7 +444,7 @@ def models(self): def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run", "--event-time-start", "2020-01-01"], expect_pass=False) + run_dbt(["run"], expect_pass=False) self.assert_row_count(project, "microbatch_model", 2) run_results = get_artifact(project.project_root, "target", "run_results.json") @@ -475,7 +469,7 @@ def models(self): def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): - _, console_output = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"]) + _, console_output = run_dbt_and_capture(["run"]) assert "PARTIAL SUCCESS (2/3)" in console_output assert "Completed with 1 partial success" in console_output @@ -515,7 +509,7 @@ def models(self): def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): - _, console_output = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"]) + _, console_output = run_dbt_and_capture(["run"]) assert "PARTIAL SUCCESS (2/3)" in console_output assert "Completed with 1 partial success" in console_output @@ -562,7 +556,7 @@ def models(self): def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run", "--event-time-start", "2020-01-01"]) + run_dbt(["run"]) self.assert_row_count(project, "microbatch_model", 2) @@ -571,7 +565,7 @@ class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest): def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run", "--event-time-start", "2020-01-01"]) + run_dbt(["run"]) # Compiled paths - compiled model without filter only assert read_file( @@ -654,7 +648,15 @@ def models(self): def test_run_with_event_time(self, project): # run all partitions from 2020-01-02 to spoofed "now" - 2 expected rows in output with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run", "--event-time-start", "2020-01-02"]) + run_dbt( + [ + "run", + "--event-time-start", + "2020-01-02", + "--event-time-end", + "2020-01-03 13:57:00", + ] + ) self.assert_row_count(project, "microbatch_model", 2) # re-running shouldn't change what it's in the data set because there is nothing new @@ -683,14 +685,13 @@ class TestMicrbobatchModelsRunWithSameCurrentTime(BaseMicrobatchTest): def models(self): return { "input_model.sql": input_model_sql, - "microbatch_model.sql": microbatch_model_sql, - "second_microbatch_model.sql": microbatch_model_downstream_sql, + "microbatch_model.sql": microbatch_yearly_model_sql, + "second_microbatch_model.sql": microbatch_yearly_model_downstream_sql, } @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_microbatch(self, project) -> None: - current_time = datetime.now(pytz.UTC) - run_dbt(["run", "--event-time-start", current_time.strftime("%Y-%m-%d")]) + run_dbt(["run"]) run_results = get_artifact(project.project_root, "target", "run_results.json") microbatch_model_last_batch = run_results["results"][1]["batch_results"]["successful"][-1]