diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index b80bf5f9294..227d93a1d51 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -12,6 +12,7 @@ MicrobatchExecutionDebug, MicrobatchMacroOutsideOfBatchesDeprecation, MicrobatchModelNoEventTimeInputs, + SkippingDetails, ) from dbt.tests.util import ( get_artifact, @@ -682,6 +683,14 @@ def test_run_with_event_time(self, project): select * from {{ ref('input_model') }} """ +microbatch_model_second_batch_failing_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +{% if '20200102' == model.batch.id %} + invalid_sql +{% endif %} +select * from {{ ref('input_model') }} +""" + class TestMicrobatchInitialBatchFailure(BaseMicrobatchTest): @pytest.fixture(scope="class") @@ -691,6 +700,43 @@ def models(self): "microbatch_model.sql": microbatch_model_first_partition_failing_sql, } + def test_run_with_event_time(self, project): + # When the first batch of a microbatch model fails, the rest of the batches should + # be skipped and the model marked as failed (not _partial success_) + + general_exc_catcher = EventCatcher( + GenericExceptionOnRun, predicate=lambda event: event.data.node_info is not None + ) + skipping_catcher = EventCatcher(event_to_catch=SkippingDetails) + + # run all partitions from start - 2 expected rows in output, one failed + with patch_microbatch_end_time("2020-01-03 13:57:00"): + run_dbt( + ["run"], + expect_pass=False, + callbacks=[general_exc_catcher.catch, skipping_catcher.catch], + ) + + assert len(general_exc_catcher.caught_events) == 1 + assert len(skipping_catcher.caught_events) == 2 + + # Because the first batch failed, and the rest of the batches were skipped, the table shouldn't + # exist in the data warehosue + relation_info = relation_from_name(project.adapter, "microbatch_model") + relation = project.adapter.get_relation( + relation_info.database, relation_info.schema, relation_info.name + ) + assert relation is None + + +class TestMicrobatchSecondBatchFailure(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_second_batch_failing_sql, + } + def test_run_with_event_time(self, project): event_catcher = EventCatcher( GenericExceptionOnRun, predicate=lambda event: event.data.node_info is not None