Skip to content

Commit

Permalink
[Tidy first] move microbatch compilation to .compile method (#11063)
Browse files Browse the repository at this point in the history
(cherry picked from commit 1b7d9b5)
  • Loading branch information
MichelleArk authored and github-actions[bot] committed Nov 28, 2024
1 parent 6a36444 commit fbc33fa
Showing 1 changed file with 27 additions and 44 deletions.
71 changes: 27 additions & 44 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ def _execute_model(
hook_ctx: Any,
context_config: Any,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
Expand Down Expand Up @@ -328,9 +327,7 @@ def execute(self, model, manifest):

hook_ctx = self.adapter.pre_model_hook(context_config)

return self._execute_model(
hook_ctx, context_config, model, manifest, context, materialization_macro
)
return self._execute_model(hook_ctx, context_config, model, context, materialization_macro)


class MicrobatchModelRunner(ModelRunner):
Expand All @@ -342,10 +339,30 @@ def __init__(self, config, adapter, node, node_index: int, num_nodes: int):
self.relation_exists: bool = False

def compile(self, manifest: Manifest):
# The default compile function is _always_ called. However, we do our
# compilation _later_ in `_execute_microbatch_materialization`. This
# meant the node was being compiled _twice_ for each batch. To get around
# this, we've overriden the default compile method to do nothing
if self.batch_idx is not None:
batch = self.batches[self.batch_idx]

Check warning on line 343 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L342-L343

Added lines #L342 - L343 were not covered by tests

# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
# TODO: REMOVE before 1.10 GA
self.node.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
self.node.config["__dbt_internal_microbatch_event_time_end"] = batch[1]

Check warning on line 348 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L347-L348

Added lines #L347 - L348 were not covered by tests
# Create batch context on model node prior to re-compiling
self.node.batch = BatchContext(

Check warning on line 350 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L350

Added line #L350 was not covered by tests
id=MicrobatchBuilder.batch_id(batch[0], self.node.config.batch_size),
event_time_start=batch[0],
event_time_end=batch[1],
)
# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(

Check warning on line 356 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L356

Added line #L356 was not covered by tests
self.node,
manifest,
{},
split_suffix=MicrobatchBuilder.format_batch_start(
batch[0], self.node.config.batch_size
),
)

# Skips compilation for non-batch runs
return self.node

def set_batch_idx(self, batch_idx: int) -> None:
Expand Down Expand Up @@ -502,7 +519,6 @@ def _build_run_microbatch_model_result(self, model: ModelNode) -> RunResult:
def _execute_microbatch_materialization(
self,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
Expand Down Expand Up @@ -537,25 +553,6 @@ def _execute_microbatch_materialization(
# call materialization_macro to get a batch-level run result
start_time = time.perf_counter()
try:
# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
# TODO: REMOVE before 1.10 GA
model.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
model.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
# Create batch context on model node prior to re-compiling
model.batch = BatchContext(
id=MicrobatchBuilder.batch_id(batch[0], model.config.batch_size),
event_time_start=batch[0],
event_time_end=batch[1],
)
# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(
model,
manifest,
{},
split_suffix=MicrobatchBuilder.format_batch_start(
batch[0], model.config.batch_size
),
)
# Update jinja context with batch context members
jinja_context = microbatch_builder.build_jinja_context_for_batch(
incremental_batch=self.relation_exists
Expand Down Expand Up @@ -643,37 +640,23 @@ def _is_incremental(self, model) -> bool:
else:
return False

def _execute_microbatch_model(
def _execute_model(
self,
hook_ctx: Any,
context_config: Any,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
try:
batch_result = self._execute_microbatch_materialization(
model, manifest, context, materialization_macro
model, context, materialization_macro
)
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

return batch_result

def _execute_model(
self,
hook_ctx: Any,
context_config: Any,
model: ModelNode,
manifest: Manifest,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
return self._execute_microbatch_model(
hook_ctx, context_config, model, manifest, context, materialization_macro
)


class RunTask(CompileTask):
def __init__(
Expand Down

0 comments on commit fbc33fa

Please sign in to comment.