From afe25a99fecd056f18aff3eae03eea10d348eaff Mon Sep 17 00:00:00 2001 From: Peter Webb Date: Thu, 5 Dec 2024 16:33:16 -0500 Subject: [PATCH] Improve the Performance Characteristics of add_test_edges() (#11092) * New function to add graph edges. * Clean up, leave out flag temporarily for testing. * Put new test edge behavior behind flag. * Final draft of documentaiton. --- .../unreleased/Fixes-20241204-100429.yaml | 6 + core/dbt/cli/main.py | 1 + core/dbt/cli/params.py | 7 + core/dbt/compilation.py | 195 +++++++++++++++++- 4 files changed, 205 insertions(+), 4 deletions(-) create mode 100644 .changes/unreleased/Fixes-20241204-100429.yaml diff --git a/.changes/unreleased/Fixes-20241204-100429.yaml b/.changes/unreleased/Fixes-20241204-100429.yaml new file mode 100644 index 00000000000..378444e769d --- /dev/null +++ b/.changes/unreleased/Fixes-20241204-100429.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Improve the performance characteristics of add_test_edges() +time: 2024-12-04T10:04:29.096231-05:00 +custom: + Author: peterallenwebb + Issue: "10950" diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index a9de9441365..11cc81ef70e 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -140,6 +140,7 @@ def global_flags(func): @p.warn_error @p.warn_error_options @p.write_json + @p.use_fast_test_edges @functools.wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) diff --git a/core/dbt/cli/params.py b/core/dbt/cli/params.py index 96e9e7acd7a..612728de222 100644 --- a/core/dbt/cli/params.py +++ b/core/dbt/cli/params.py @@ -735,3 +735,10 @@ def _version_callback(ctx, _param, value): envvar="DBT_SHOW_RESOURCE_REPORT", hidden=True, ) + +use_fast_test_edges = click.option( + "--use-fast-test-edges/--no-use-fast-test-edges", + envvar="DBT_USE_FAST_TEST_EDGES", + default=False, + hidden=True, +) diff --git a/core/dbt/compilation.py b/core/dbt/compilation.py index 0ffa73df715..81ab849c8d1 100644 --- a/core/dbt/compilation.py +++ b/core/dbt/compilation.py @@ -1,8 +1,9 @@ +import dataclasses import json import os import pickle -from collections import defaultdict -from typing import Any, Dict, List, Optional, Tuple +from collections import defaultdict, deque +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple import networkx as nx # type: ignore import sqlparse @@ -117,6 +118,16 @@ def _get_tests_for_node(manifest: Manifest, unique_id: UniqueID) -> List[UniqueI return tests +@dataclasses.dataclass +class SeenDetails: + node_id: UniqueID + visits: int = 0 + ancestors: Set[UniqueID] = dataclasses.field(default_factory=set) + awaits_tests: Set[Tuple[UniqueID, Tuple[UniqueID, ...]]] = dataclasses.field( + default_factory=set + ) + + class Linker: def __init__(self, data=None) -> None: if data is None: @@ -195,19 +206,62 @@ def link_graph(self, manifest: Manifest): raise RuntimeError("Found a cycle: {}".format(cycle)) def add_test_edges(self, manifest: Manifest) -> None: + if not get_flags().USE_FAST_TEST_EDGES: + self.add_test_edges_1(manifest) + else: + self.add_test_edges_2(manifest) + + def add_test_edges_1(self, manifest: Manifest) -> None: """This method adds additional edges to the DAG. For a given non-test executable node, add an edge from an upstream test to the given node if the set of nodes the test depends on is a subset of the upstream nodes for the given node.""" - # Given a graph: + # HISTORICAL NOTE: To understand the motivation behind this function, + # consider a node A with tests and a node B which depends (either directly + # or indirectly) on A. It would be nice if B were not executed until + # all of the tests on A are finished. After all, we don't want to + # propagate bad data. We can enforce that behavior by adding new + # dependencies (edges) from tests to nodes that should wait on them. + # + # This function implements a rough approximation of the behavior just + # described. In fact, for tests that only depend on a single node, it + # always works. + # + # Things get trickier for tests that depend on multiple nodes. In that + # case, if we are not careful, we will introduce cycles. That seems to + # be the reason this function adds dependencies from a downstream node to + # an upstream test if and only if the downstream node is already a + # descendant of all the nodes the upstream test depends on. By following + # that rule, it never makes the node dependent on new upstream nodes other + # than the tests themselves, and no cycles will be created. + # + # One drawback (Drawback 1) of the approach taken in this function is + # that it could still allow a downstream node to proceed before all + # testing is done on its ancestors, if it happens to have ancestors that + # are not also ancestors of a test with multiple dependencies. + # + # Another drawback (Drawback 2) is that the approach below adds far more + # edges than are strictly needed. After all, if we have A -> B -> C, + # there is no need to add a new edge A -> C. But this function often does. + # + # Drawback 2 is resolved in the new add_test_edges_2() implementation + # below, which is also typically much faster. Drawback 1 has been left in + # place in order to conservatively retain existing behavior, and so that + # the new implementation can be verified against this existing + # implementation by ensuring both resulting graphs have the same transitive + # reduction. + + # MOTIVATING IDEA: Given a graph... + # # model1 --> model2 --> model3 # | | # | \/ # \/ test 2 # test1 # - # Produce the following graph: + # ...produce the following... + # # model1 --> model2 --> model3 # | /\ | /\ /\ # | | \/ | | @@ -247,6 +301,139 @@ def add_test_edges(self, manifest: Manifest) -> None: if test_depends_on.issubset(upstream_nodes): self.graph.add_edge(upstream_test, node_id, edge_type="parent_test") + def add_test_edges_2(self, manifest: Manifest): + graph = self.graph + new_edges = self._get_test_edges_2(graph, manifest) + for e in new_edges: + graph.add_edge(e[0], e[1], edge_type="parent_test") + + @staticmethod + def _get_test_edges_2( + graph: nx.DiGraph, manifest: Manifest + ) -> Iterable[Tuple[UniqueID, UniqueID]]: + # This function enforces the same execution behavior as add_test_edges, + # but executes far more quickly and adds far fewer edges. See the + # HISTORICAL NOTE above. + # + # The idea is to first scan for "single-tested" nodes (which have tests + # that depend only upon on that node) and "multi-tested" nodes (which + # have tests that depend on multiple nodes). Single-tested nodes are + # handled quickly and easily. + # + # The less common but more complex case of multi-tested nodes is handled + # by a specialized function. + + new_edges: List[Tuple[UniqueID, UniqueID]] = [] + + source_nodes: List[UniqueID] = [] + executable_nodes: Set[UniqueID] = set() + multi_tested_nodes = set() + # Dictionary mapping nodes with single-dep tests to a list of those tests. + single_tested_nodes: dict[UniqueID, List[UniqueID]] = defaultdict(list) + for node_id in graph.nodes: + manifest_node = manifest.nodes.get(node_id, None) + if manifest_node is None: + continue + + if next(graph.predecessors(node_id), None) is None: + source_nodes.append(node_id) + + if manifest_node.resource_type != NodeType.Test: + executable_nodes.add(node_id) + else: + test_deps = manifest_node.depends_on_nodes + if len(test_deps) == 1: + single_tested_nodes[test_deps[0]].append(node_id) + elif len(test_deps) > 1: + multi_tested_nodes.update(manifest_node.depends_on_nodes) + + # Now that we have all the necessary information conveniently organized, + # add new edges for single-tested nodes. + for node_id, test_ids in single_tested_nodes.items(): + succs = [s for s in graph.successors(node_id) if s in executable_nodes] + for succ_id in succs: + for test_id in test_ids: + new_edges.append((test_id, succ_id)) + + # Get the edges for multi-tested nodes separately, if needed. + if len(multi_tested_nodes) > 0: + multi_test_edges = Linker._get_multi_test_edges( + graph, manifest, source_nodes, executable_nodes, multi_tested_nodes + ) + new_edges += multi_test_edges + + return new_edges + + @staticmethod + def _get_multi_test_edges( + graph: nx.DiGraph, + manifest: Manifest, + source_nodes: Iterable[UniqueID], + executable_nodes: Set[UniqueID], + multi_tested_nodes, + ) -> List[Tuple[UniqueID, UniqueID]]: + # Works through the graph in a breadth-first style, processing nodes from + # a ready queue which initially consists of nodes with no ancestors, + # and adding more nodes to the ready queue after all their ancestors + # have been processed. All the while, the relevant details of all nodes + # "seen" by the search so far are maintained in a SeenDetails record, + # including the ancestor set which tests it is "awaiting" (i.e. tests of + # its ancestors). The processing step adds test edges when every dependency + # of an awaited test is an ancestor of a node that is being processed. + # Downstream nodes are then exempted from awaiting the test. + # + # Memory consumption is potentially O(n^2) with n the number of nodes in + # the graph, since the average number of ancestors and tests being awaited + # for each of the n nodes could itself be O(n) but we only track ancestors + # that are multi-tested, which should keep things closer to O(n) in real- + # world scenarios. + + new_edges: List[Tuple[UniqueID, UniqueID]] = [] + ready: deque = deque(source_nodes) + details = {node_id: SeenDetails(node_id) for node_id in source_nodes} + + while len(ready) > 0: + curr_details: SeenDetails = details[ready.pop()] + test_ids = _get_tests_for_node(manifest, curr_details.node_id) + new_awaits_for_succs = curr_details.awaits_tests.copy() + for test_id in test_ids: + deps: List[UniqueID] = sorted(manifest.nodes[test_id].depends_on_nodes) + if len(deps) > 1: + # Tests with only one dep were already handled. + new_awaits_for_succs.add((test_id, tuple(deps))) + + for succ_id in [ + s for s in graph.successors(curr_details.node_id) if s in executable_nodes + ]: + suc_details = details.get(succ_id, None) + if suc_details is None: + suc_details = SeenDetails(succ_id) + details[succ_id] = suc_details + suc_details.visits += 1 + suc_details.awaits_tests.update(new_awaits_for_succs) + suc_details.ancestors.update(curr_details.ancestors) + if curr_details.node_id in multi_tested_nodes: + # Only track ancestry information for the set of nodes + # we will actually check against later. + suc_details.ancestors.add(curr_details.node_id) + + if suc_details.visits == graph.in_degree(succ_id): + if len(suc_details.awaits_tests) > 0: + removes = set() + for awt in suc_details.awaits_tests: + if not any(True for a in awt[1] if a not in suc_details.ancestors): + removes.add(awt) + new_edges.append((awt[0], succ_id)) + + suc_details.awaits_tests.difference_update(removes) + ready.appendleft(succ_id) + + # We are now done with the current node and all of its ancestors. + # Discard its details to save memory. + del details[curr_details.node_id] + + return new_edges + def get_graph(self, manifest: Manifest) -> Graph: self.link_graph(manifest) return Graph(self.graph)