From b3e270351bd4848a4cad1e1f5dc3009074779f8a Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 27 Nov 2024 15:26:24 -0500 Subject: [PATCH] move microbatch compilation to .compile method --- core/dbt/task/run.py | 47 ++++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 55b73be3d80..fb4ee145429 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -342,10 +342,30 @@ def __init__(self, config, adapter, node, node_index: int, num_nodes: int): self.relation_exists: bool = False def compile(self, manifest: Manifest): - # The default compile function is _always_ called. However, we do our - # compilation _later_ in `_execute_microbatch_materialization`. This - # meant the node was being compiled _twice_ for each batch. To get around - # this, we've overriden the default compile method to do nothing + if self.batch_idx is not None: + batch = self.batches[self.batch_idx] + + # LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+) + # TODO: REMOVE before 1.10 GA + self.node.config["__dbt_internal_microbatch_event_time_start"] = batch[0] + self.node.config["__dbt_internal_microbatch_event_time_end"] = batch[1] + # Create batch context on model node prior to re-compiling + self.node.batch = BatchContext( + id=MicrobatchBuilder.batch_id(batch[0], self.node.config.batch_size), + event_time_start=batch[0], + event_time_end=batch[1], + ) + # Recompile node to re-resolve refs with event time filters rendered, update context + self.compiler.compile_node( + self.node, + manifest, + {}, + split_suffix=MicrobatchBuilder.format_batch_start( + batch[0], self.node.config.batch_size + ), + ) + + # Skips compilation for non-batch runs return self.node def set_batch_idx(self, batch_idx: int) -> None: @@ -537,25 +557,6 @@ def _execute_microbatch_materialization( # call materialization_macro to get a batch-level run result start_time = time.perf_counter() try: - # LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+) - # TODO: REMOVE before 1.10 GA - model.config["__dbt_internal_microbatch_event_time_start"] = batch[0] - model.config["__dbt_internal_microbatch_event_time_end"] = batch[1] - # Create batch context on model node prior to re-compiling - model.batch = BatchContext( - id=MicrobatchBuilder.batch_id(batch[0], model.config.batch_size), - event_time_start=batch[0], - event_time_end=batch[1], - ) - # Recompile node to re-resolve refs with event time filters rendered, update context - self.compiler.compile_node( - model, - manifest, - {}, - split_suffix=MicrobatchBuilder.format_batch_start( - batch[0], model.config.batch_size - ), - ) # Update jinja context with batch context members jinja_context = microbatch_builder.build_jinja_context_for_batch( incremental_batch=self.relation_exists