From ec3c6e7b5d00a9f359bdb8e09ccfa9dd43018879 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Fri, 20 Sep 2024 14:20:25 -0500 Subject: [PATCH] Initial pass at microbatch config validation --- core/dbt/parser/manifest.py | 40 +++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index d54aa898713..7f151c52a09 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -24,6 +24,7 @@ register_adapter, ) from dbt.artifacts.resources import FileHash, NodeRelation, NodeVersion +from dbt.artifacts.resources.types import BatchSize from dbt.artifacts.schemas.base import Writable from dbt.clients.jinja import MacroStack, get_rendered from dbt.clients.jinja_static import statically_extract_macro_calls @@ -468,6 +469,7 @@ def load(self) -> Manifest: self.check_valid_group_config() self.check_valid_access_property() self.check_valid_snapshot_config() + self.check_valid_microbatch_config() semantic_manifest = SemanticManifest(self.manifest) if not semantic_manifest.validate(): @@ -1355,6 +1357,44 @@ def check_valid_snapshot_config(self): continue node.config.final_validate() + def check_valid_microbatch_config(self): + if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"): + for node in self.manifest.nodes.values(): + if ( + node.config.materialized == "incremental" + and node.config.incremental_strategy == "microbatch" + ): + event_time = node.config.event_time + if not isinstance(event_time, str): + # TODO be more specific/verbose + raise DbtValidationError( + f"When used with microbatch, `event_time` must be of type str, but got {type(event_time)}" + ) + + lookback = node.config.lookback + if not isinstance(lookback, int) and lookback is not None: + # TODO be more specific/verbose + raise DbtValidationError( + f"When used with microbatch, `lookback` must be of type int or None, but got {type(lookback)}" + ) + + batch_size = node.config.batch_size + valid_batch_sizes = [size.value for size in BatchSize] + if batch_size not in valid_batch_sizes: + # TODO be more specific/verbose + raise DbtValidationError( + f"When used with microbatch, `batch_size` must be one of {valid_batch_sizes}, but got {batch_size}" + ) + + for input_unique_id in node.depends_on.nodes: + input_node = self.manifest.expect(unique_id=input_unique_id) + input_event_time = input_node.config.event_time + if input_event_time and not isinstance(input_event_time, str): + # TODO be more specific/verbose + raise DbtValidationError( + f"When used as an input to a microbatch model, `event_time` must be a str or None, but got {type(input_event_time)}" + ) + def write_perf_info(self, target_path: str): path = os.path.join(target_path, PERF_INFO_FILE_NAME) write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4))