diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 56537ad48cf..b80bf5f9294 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -7,6 +7,7 @@ ArtifactWritten, EndOfRunSummary, GenericExceptionOnRun, + JinjaLogDebug, LogModelResult, MicrobatchExecutionDebug, MicrobatchMacroOutsideOfBatchesDeprecation, @@ -54,6 +55,21 @@ select * from {{ ref('input_model') }} """ +microbatch_model_with_pre_and_post_sql = """ +{{ config( + materialized='incremental', + incremental_strategy='microbatch', + unique_key='id', + event_time='event_time', + batch_size='day', + begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), + pre_hook='{{log("execute: " ~ execute ~ ", pre-hook run by batch " ~ model.batch.id)}}', + post_hook='{{log("execute: " ~ execute ~ ", post-hook run by batch " ~ model.batch.id)}}', + ) +}} +select * from {{ ref('input_model') }} +""" + microbatch_yearly_model_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} select * from {{ ref('input_model') }} @@ -905,3 +921,100 @@ def test_microbatch( some_batches_run_concurrently = True break assert not some_batches_run_concurrently, "Found a batch being run concurrently!" + + +class TestFirstAndLastBatchAlwaysSequential(BaseMicrobatchTest): + @pytest.fixture + def batch_exc_catcher(self) -> EventCatcher: + return EventCatcher(MicrobatchExecutionDebug) # type: ignore + + def test_microbatch( + self, mocker: MockerFixture, project, batch_exc_catcher: EventCatcher + ) -> None: + mocked_srip = mocker.patch("dbt.task.run.MicrobatchModelRunner.should_run_in_parallel") + + # Should be run in parallel + mocked_srip.return_value = True + with patch_microbatch_end_time("2020-01-03 13:57:00"): + _ = run_dbt(["run"], callbacks=[batch_exc_catcher.catch]) + + assert len(batch_exc_catcher.caught_events) > 1 + + first_batch_event = batch_exc_catcher.caught_events[0] + last_batch_event = batch_exc_catcher.caught_events[-1] + + for event in [first_batch_event, last_batch_event]: + assert "is being run sequentially" in event.data.msg # type: ignore + + for event in batch_exc_catcher.caught_events[1:-1]: + assert "is being run concurrently" in event.data.msg # type: ignore + + +class TestFirstBatchRunsPreHookLastBatchRunsPostHook(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_with_pre_and_post_sql, + } + + @pytest.fixture + def batch_log_catcher(self) -> EventCatcher: + def pre_or_post_hook(event) -> bool: + return "execute: True" in event.data.msg and ( + "pre-hook" in event.data.msg or "post-hook" in event.data.msg + ) + + return EventCatcher(event_to_catch=JinjaLogDebug, predicate=pre_or_post_hook) # type: ignore + + def test_microbatch( + self, mocker: MockerFixture, project, batch_log_catcher: EventCatcher + ) -> None: + with patch_microbatch_end_time("2020-01-04 13:57:00"): + _ = run_dbt(["run"], callbacks=[batch_log_catcher.catch]) + + # There should be two logs as the pre-hook and post-hook should + # both only be run once + assert len(batch_log_catcher.caught_events) == 2 + + for event in batch_log_catcher.caught_events: + # batch id that should be firing pre-hook + if "20200101" in event.data.msg: # type: ignore + assert "pre-hook" in event.data.msg # type: ignore + + # batch id that should be firing the post-hook + if "20200104" in event.data.msg: # type: ignore + assert "post-hook" in event.data.msg # type: ignore + + +class TestWhenOnlyOneBatchRunBothPostAndPreHooks(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_with_pre_and_post_sql, + } + + @pytest.fixture + def batch_log_catcher(self) -> EventCatcher: + def pre_or_post_hook(event) -> bool: + return "execute: True" in event.data.msg and ( + "pre-hook" in event.data.msg or "post-hook" in event.data.msg + ) + + return EventCatcher(event_to_catch=JinjaLogDebug, predicate=pre_or_post_hook) # type: ignore + + def test_microbatch( + self, mocker: MockerFixture, project, batch_log_catcher: EventCatcher + ) -> None: + with patch_microbatch_end_time("2020-01-01 13:57:00"): + _ = run_dbt(["run"], callbacks=[batch_log_catcher.catch]) + + # There should be two logs as the pre-hook and post-hook should + # both only be run once + assert len(batch_log_catcher.caught_events) == 2 + + assert "20200101" in batch_log_catcher.caught_events[0].data.msg # type: ignore + assert "pre-hook" in batch_log_catcher.caught_events[0].data.msg # type: ignore + assert "20200101" in batch_log_catcher.caught_events[1].data.msg # type: ignore + assert "post-hook" in batch_log_catcher.caught_events[1].data.msg # type: ignore