From 593880ea127d0651fc5384d8d4bee67d621e9fa7 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 26 Nov 2024 15:36:17 -0600 Subject: [PATCH 1/3] Rename `batch_info` to `previous_batch_results` --- core/dbt/contracts/graph/nodes.py | 2 +- core/dbt/task/retry.py | 2 +- core/dbt/task/run.py | 12 ++++++------ tests/unit/contracts/graph/test_manifest.py | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index d341b167ab9..94aa117df77 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -444,7 +444,7 @@ def resource_class(cls) -> Type[HookNodeResource]: @dataclass class ModelNode(ModelResource, CompiledNode): - batch_info: Optional[BatchResults] = None + previous_batch_results: Optional[BatchResults] = None _has_this: Optional[bool] = None def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): diff --git a/core/dbt/task/retry.py b/core/dbt/task/retry.py index 9b3c3874718..c808872b8f8 100644 --- a/core/dbt/task/retry.py +++ b/core/dbt/task/retry.py @@ -136,7 +136,7 @@ def run(self): # batch info if there were _no_ successful batches previously. This is # because passing the batch info _forces_ the microbatch process into # _incremental_ model, and it may be that we need to be in full refresh - # mode which is only handled if batch_info _isn't_ passed for a node + # mode which is only handled if previous_batch_results _isn't_ passed for a node batch_map = { result.unique_id: result.batch_results for result in self.previous_results.results diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 0c2888bb325..e5e85c50226 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -445,8 +445,8 @@ def merge_batch_results(self, result: RunResult, batch_results: List[RunResult]) result.batch_results.failed = sorted(result.batch_results.failed) # # If retrying, propagate previously successful batches into final result, even thoguh they were not run in this invocation - if self.node.batch_info is not None: - result.batch_results.successful += self.node.batch_info.successful + if self.node.previous_batch_results is not None: + result.batch_results.successful += self.node.previous_batch_results.successful def _build_succesful_run_batch_result( self, @@ -508,15 +508,15 @@ def _execute_microbatch_materialization( ) if self.batch_idx is None: - # Note currently (9/30/2024) model.batch_info is only ever _not_ `None` + # Note currently (9/30/2024) model.previous_batch_results is only ever _not_ `None` # IFF `dbt retry` is being run and the microbatch model had batches which # failed on the run of the model (which is being retried) - if model.batch_info is None: + if model.previous_batch_results is None: end = microbatch_builder.build_end_time() start = microbatch_builder.build_start_time(end) batches = microbatch_builder.build_batches(start, end) else: - batches = model.batch_info.failed + batches = model.previous_batch_results.failed # If there is batch info, then don't run as full_refresh and do force is_incremental # not doing this risks blowing away the work that has already been done if self._has_relation(model=model): @@ -885,7 +885,7 @@ def populate_microbatch_batches(self, selected_uids: AbstractSet[str]): if uid in self.batch_map: node = self.manifest.ref_lookup.perform_lookup(uid, self.manifest) if isinstance(node, ModelNode): - node.batch_info = self.batch_map[uid] + node.previous_batch_results = self.batch_map[uid] def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: with adapter.connection_named("master"): diff --git a/tests/unit/contracts/graph/test_manifest.py b/tests/unit/contracts/graph/test_manifest.py index 146a51d5856..ed548ace836 100644 --- a/tests/unit/contracts/graph/test_manifest.py +++ b/tests/unit/contracts/graph/test_manifest.py @@ -96,7 +96,7 @@ "deprecation_date", "defer_relation", "time_spine", - "batch_info", + "previous_batch_results", } ) From 4050e377ec01c2f14dd9600fe704ddb34adb66fa Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 26 Nov 2024 15:37:26 -0600 Subject: [PATCH 2/3] Exclude `previous_batch_results` from serialization of model node to avoid jinja context bloat --- core/dbt/contracts/graph/nodes.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index 94aa117df77..0eaf758ae5a 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -451,6 +451,8 @@ def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): dct = super().__post_serialize__(dct, context) if "_has_this" in dct: del dct["_has_this"] + if "previous_batch_results" in dct: + del dct["previous_batch_results"] return dct @classmethod From dd760a202376bb02bc4cd3aae9bda6a5538d56ec Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Wed, 27 Nov 2024 10:22:19 -0600 Subject: [PATCH 3/3] Drop `previous_batch_results` key from `test_manifest.py` unit tests In 4050e377ec01c2f14dd9600fe704ddb34adb66fa we began excluding `previous_batch_results` from the serialized representation of the ModelNode. As such, we no longer need to check for it in `test_manifest.py`. --- tests/unit/contracts/graph/test_manifest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/contracts/graph/test_manifest.py b/tests/unit/contracts/graph/test_manifest.py index ed548ace836..0f3a80e5039 100644 --- a/tests/unit/contracts/graph/test_manifest.py +++ b/tests/unit/contracts/graph/test_manifest.py @@ -96,7 +96,6 @@ "deprecation_date", "defer_relation", "time_spine", - "previous_batch_results", } )