Skip to content

Commit

Permalink
Add microbatch pre-hook, post-hook, and sequential first/last batch t…
Browse files Browse the repository at this point in the history
…ests
  • Loading branch information
QMalcolm committed Dec 6, 2024
1 parent 8424209 commit 50abb45
Showing 1 changed file with 113 additions and 0 deletions.
113 changes: 113 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
ArtifactWritten,
EndOfRunSummary,
GenericExceptionOnRun,
JinjaLogDebug,
LogModelResult,
MicrobatchExecutionDebug,
MicrobatchMacroOutsideOfBatchesDeprecation,
Expand Down Expand Up @@ -54,6 +55,21 @@
select * from {{ ref('input_model') }}
"""

microbatch_model_with_pre_and_post_sql = """
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
unique_key='id',
event_time='event_time',
batch_size='day',
begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0),
pre_hook='{{log("execute: " ~ execute ~ ", pre-hook run by batch " ~ model.batch.id)}}',
post_hook='{{log("execute: " ~ execute ~ ", post-hook run by batch " ~ model.batch.id)}}',
)
}}
select * from {{ ref('input_model') }}
"""

microbatch_yearly_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('input_model') }}
Expand Down Expand Up @@ -905,3 +921,100 @@ def test_microbatch(
some_batches_run_concurrently = True
break
assert not some_batches_run_concurrently, "Found a batch being run concurrently!"


class TestFirstAndLastBatchAlwaysSequential(BaseMicrobatchTest):
@pytest.fixture
def batch_exc_catcher(self) -> EventCatcher:
return EventCatcher(MicrobatchExecutionDebug) # type: ignore

def test_microbatch(
self, mocker: MockerFixture, project, batch_exc_catcher: EventCatcher
) -> None:
mocked_srip = mocker.patch("dbt.task.run.MicrobatchModelRunner.should_run_in_parallel")

# Should be run in parallel
mocked_srip.return_value = True
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_ = run_dbt(["run"], callbacks=[batch_exc_catcher.catch])

assert len(batch_exc_catcher.caught_events) > 1

first_batch_event = batch_exc_catcher.caught_events[0]
last_batch_event = batch_exc_catcher.caught_events[-1]

for event in [first_batch_event, last_batch_event]:
assert "is being run sequentially" in event.data.msg # type: ignore

for event in batch_exc_catcher.caught_events[1:-1]:
assert "is being run concurrently" in event.data.msg # type: ignore


class TestFirstBatchRunsPreHookLastBatchRunsPostHook(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_with_pre_and_post_sql,
}

@pytest.fixture
def batch_log_catcher(self) -> EventCatcher:
def pre_or_post_hook(event) -> bool:
return "execute: True" in event.data.msg and (
"pre-hook" in event.data.msg or "post-hook" in event.data.msg
)

return EventCatcher(event_to_catch=JinjaLogDebug, predicate=pre_or_post_hook) # type: ignore

def test_microbatch(
self, mocker: MockerFixture, project, batch_log_catcher: EventCatcher
) -> None:
with patch_microbatch_end_time("2020-01-04 13:57:00"):
_ = run_dbt(["run"], callbacks=[batch_log_catcher.catch])

# There should be two logs as the pre-hook and post-hook should
# both only be run once
assert len(batch_log_catcher.caught_events) == 2

for event in batch_log_catcher.caught_events:
# batch id that should be firing pre-hook
if "20200101" in event.data.msg: # type: ignore
assert "pre-hook" in event.data.msg # type: ignore

# batch id that should be firing the post-hook
if "20200104" in event.data.msg: # type: ignore
assert "post-hook" in event.data.msg # type: ignore


class TestWhenOnlyOneBatchRunBothPostAndPreHooks(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_with_pre_and_post_sql,
}

@pytest.fixture
def batch_log_catcher(self) -> EventCatcher:
def pre_or_post_hook(event) -> bool:
return "execute: True" in event.data.msg and (
"pre-hook" in event.data.msg or "post-hook" in event.data.msg
)

return EventCatcher(event_to_catch=JinjaLogDebug, predicate=pre_or_post_hook) # type: ignore

def test_microbatch(
self, mocker: MockerFixture, project, batch_log_catcher: EventCatcher
) -> None:
with patch_microbatch_end_time("2020-01-01 13:57:00"):
_ = run_dbt(["run"], callbacks=[batch_log_catcher.catch])

# There should be two logs as the pre-hook and post-hook should
# both only be run once
assert len(batch_log_catcher.caught_events) == 2

assert "20200101" in batch_log_catcher.caught_events[0].data.msg # type: ignore
assert "pre-hook" in batch_log_catcher.caught_events[0].data.msg # type: ignore
assert "20200101" in batch_log_catcher.caught_events[1].data.msg # type: ignore
assert "post-hook" in batch_log_catcher.caught_events[1].data.msg # type: ignore

0 comments on commit 50abb45

Please sign in to comment.