diff --git a/.changes/unreleased/Fixes-20241121-112638.yaml b/.changes/unreleased/Fixes-20241121-112638.yaml new file mode 100644 index 00000000000..15a23ae8995 --- /dev/null +++ b/.changes/unreleased/Fixes-20241121-112638.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Make microbatch models skippable +time: 2024-11-21T11:26:38.192345-05:00 +custom: + Author: michelleark + Issue: "11021" diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 1b4be463e78..0c2888bb325 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -2,7 +2,7 @@ import threading import time from copy import deepcopy -from dataclasses import asdict, field +from dataclasses import asdict from datetime import datetime from multiprocessing.pool import ThreadPool from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type @@ -334,9 +334,12 @@ def execute(self, model, manifest): class MicrobatchModelRunner(ModelRunner): - batch_idx: Optional[int] = None - batches: Dict[int, BatchType] = field(default_factory=dict) - relation_exists: bool = False + def __init__(self, config, adapter, node, node_index: int, num_nodes: int): + super().__init__(config, adapter, node, node_index, num_nodes) + + self.batch_idx: Optional[int] = None + self.batches: Dict[int, BatchType] = {} + self.relation_exists: bool = False def set_batch_idx(self, batch_idx: int) -> None: self.batch_idx = batch_idx @@ -704,8 +707,11 @@ def handle_microbatch_model( runner: MicrobatchModelRunner, pool: ThreadPool, ) -> RunResult: - # Initial run computes batch metadata + # Initial run computes batch metadata, unless model is skipped result = self.call_runner(runner) + if result.status == RunStatus.Skipped: + return result + batch_results: List[RunResult] = [] # Execute batches serially until a relation exists, at which point future batches are run in parallel diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index e216bcbe772..f0f4097b9c9 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -33,6 +33,12 @@ select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time """ +input_model_invalid_sql = """ +{{ config(materialized='table', event_time='event_time') }} + +select invalid as event_time +""" + input_model_without_event_time_sql = """ {{ config(materialized='table') }} @@ -835,6 +841,24 @@ def test_microbatch( assert len(catch_aw.caught_events) == 1 +class TestMicrobatchModelSkipped(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_invalid_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + def test_microbatch_model_skipped(self, project) -> None: + run_dbt(["run"], expect_pass=False) + + run_results = get_artifact(project.project_root, "target", "run_results.json") + + microbatch_result = run_results["results"][1] + assert microbatch_result["status"] == "skipped" + assert microbatch_result["batch_results"] is None + + class TestMicrobatchCanRunParallelOrSequential(BaseMicrobatchTest): @pytest.fixture def batch_exc_catcher(self) -> EventCatcher: