Skip to content

Commit

Permalink
Improve the Performance Characteristics of add_test_edges() (#11092)
Browse files Browse the repository at this point in the history
* New function to add graph edges.

* Clean up, leave out flag temporarily for testing.

* Put new test edge behavior behind flag.

* Final draft of documentaiton.
  • Loading branch information
peterallenwebb authored Dec 5, 2024
1 parent e32b8a9 commit afe25a9
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 4 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241204-100429.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Improve the performance characteristics of add_test_edges()
time: 2024-12-04T10:04:29.096231-05:00
custom:
Author: peterallenwebb
Issue: "10950"
1 change: 1 addition & 0 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def global_flags(func):
@p.warn_error
@p.warn_error_options
@p.write_json
@p.use_fast_test_edges
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/cli/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,3 +735,10 @@ def _version_callback(ctx, _param, value):
envvar="DBT_SHOW_RESOURCE_REPORT",
hidden=True,
)

use_fast_test_edges = click.option(
"--use-fast-test-edges/--no-use-fast-test-edges",
envvar="DBT_USE_FAST_TEST_EDGES",
default=False,
hidden=True,
)
195 changes: 191 additions & 4 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import dataclasses
import json
import os
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple
from collections import defaultdict, deque
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple

import networkx as nx # type: ignore
import sqlparse
Expand Down Expand Up @@ -117,6 +118,16 @@ def _get_tests_for_node(manifest: Manifest, unique_id: UniqueID) -> List[UniqueI
return tests


@dataclasses.dataclass
class SeenDetails:
node_id: UniqueID
visits: int = 0
ancestors: Set[UniqueID] = dataclasses.field(default_factory=set)
awaits_tests: Set[Tuple[UniqueID, Tuple[UniqueID, ...]]] = dataclasses.field(
default_factory=set
)


class Linker:
def __init__(self, data=None) -> None:
if data is None:
Expand Down Expand Up @@ -195,19 +206,62 @@ def link_graph(self, manifest: Manifest):
raise RuntimeError("Found a cycle: {}".format(cycle))

def add_test_edges(self, manifest: Manifest) -> None:
if not get_flags().USE_FAST_TEST_EDGES:
self.add_test_edges_1(manifest)
else:
self.add_test_edges_2(manifest)

def add_test_edges_1(self, manifest: Manifest) -> None:
"""This method adds additional edges to the DAG. For a given non-test
executable node, add an edge from an upstream test to the given node if
the set of nodes the test depends on is a subset of the upstream nodes
for the given node."""

# Given a graph:
# HISTORICAL NOTE: To understand the motivation behind this function,
# consider a node A with tests and a node B which depends (either directly
# or indirectly) on A. It would be nice if B were not executed until
# all of the tests on A are finished. After all, we don't want to
# propagate bad data. We can enforce that behavior by adding new
# dependencies (edges) from tests to nodes that should wait on them.
#
# This function implements a rough approximation of the behavior just
# described. In fact, for tests that only depend on a single node, it
# always works.
#
# Things get trickier for tests that depend on multiple nodes. In that
# case, if we are not careful, we will introduce cycles. That seems to
# be the reason this function adds dependencies from a downstream node to
# an upstream test if and only if the downstream node is already a
# descendant of all the nodes the upstream test depends on. By following
# that rule, it never makes the node dependent on new upstream nodes other
# than the tests themselves, and no cycles will be created.
#
# One drawback (Drawback 1) of the approach taken in this function is
# that it could still allow a downstream node to proceed before all
# testing is done on its ancestors, if it happens to have ancestors that
# are not also ancestors of a test with multiple dependencies.
#
# Another drawback (Drawback 2) is that the approach below adds far more
# edges than are strictly needed. After all, if we have A -> B -> C,
# there is no need to add a new edge A -> C. But this function often does.
#
# Drawback 2 is resolved in the new add_test_edges_2() implementation
# below, which is also typically much faster. Drawback 1 has been left in
# place in order to conservatively retain existing behavior, and so that
# the new implementation can be verified against this existing
# implementation by ensuring both resulting graphs have the same transitive
# reduction.

# MOTIVATING IDEA: Given a graph...
#
# model1 --> model2 --> model3
# | |
# | \/
# \/ test 2
# test1
#
# Produce the following graph:
# ...produce the following...
#
# model1 --> model2 --> model3
# | /\ | /\ /\
# | | \/ | |
Expand Down Expand Up @@ -247,6 +301,139 @@ def add_test_edges(self, manifest: Manifest) -> None:
if test_depends_on.issubset(upstream_nodes):
self.graph.add_edge(upstream_test, node_id, edge_type="parent_test")

def add_test_edges_2(self, manifest: Manifest):
graph = self.graph
new_edges = self._get_test_edges_2(graph, manifest)
for e in new_edges:
graph.add_edge(e[0], e[1], edge_type="parent_test")

@staticmethod
def _get_test_edges_2(
graph: nx.DiGraph, manifest: Manifest
) -> Iterable[Tuple[UniqueID, UniqueID]]:
# This function enforces the same execution behavior as add_test_edges,
# but executes far more quickly and adds far fewer edges. See the
# HISTORICAL NOTE above.
#
# The idea is to first scan for "single-tested" nodes (which have tests
# that depend only upon on that node) and "multi-tested" nodes (which
# have tests that depend on multiple nodes). Single-tested nodes are
# handled quickly and easily.
#
# The less common but more complex case of multi-tested nodes is handled
# by a specialized function.

new_edges: List[Tuple[UniqueID, UniqueID]] = []

source_nodes: List[UniqueID] = []
executable_nodes: Set[UniqueID] = set()
multi_tested_nodes = set()
# Dictionary mapping nodes with single-dep tests to a list of those tests.
single_tested_nodes: dict[UniqueID, List[UniqueID]] = defaultdict(list)
for node_id in graph.nodes:
manifest_node = manifest.nodes.get(node_id, None)
if manifest_node is None:
continue

if next(graph.predecessors(node_id), None) is None:
source_nodes.append(node_id)

if manifest_node.resource_type != NodeType.Test:
executable_nodes.add(node_id)
else:
test_deps = manifest_node.depends_on_nodes
if len(test_deps) == 1:
single_tested_nodes[test_deps[0]].append(node_id)
elif len(test_deps) > 1:
multi_tested_nodes.update(manifest_node.depends_on_nodes)

# Now that we have all the necessary information conveniently organized,
# add new edges for single-tested nodes.
for node_id, test_ids in single_tested_nodes.items():
succs = [s for s in graph.successors(node_id) if s in executable_nodes]
for succ_id in succs:
for test_id in test_ids:
new_edges.append((test_id, succ_id))

# Get the edges for multi-tested nodes separately, if needed.
if len(multi_tested_nodes) > 0:
multi_test_edges = Linker._get_multi_test_edges(
graph, manifest, source_nodes, executable_nodes, multi_tested_nodes
)
new_edges += multi_test_edges

return new_edges

@staticmethod
def _get_multi_test_edges(
graph: nx.DiGraph,
manifest: Manifest,
source_nodes: Iterable[UniqueID],
executable_nodes: Set[UniqueID],
multi_tested_nodes,
) -> List[Tuple[UniqueID, UniqueID]]:
# Works through the graph in a breadth-first style, processing nodes from
# a ready queue which initially consists of nodes with no ancestors,
# and adding more nodes to the ready queue after all their ancestors
# have been processed. All the while, the relevant details of all nodes
# "seen" by the search so far are maintained in a SeenDetails record,
# including the ancestor set which tests it is "awaiting" (i.e. tests of
# its ancestors). The processing step adds test edges when every dependency
# of an awaited test is an ancestor of a node that is being processed.
# Downstream nodes are then exempted from awaiting the test.
#
# Memory consumption is potentially O(n^2) with n the number of nodes in
# the graph, since the average number of ancestors and tests being awaited
# for each of the n nodes could itself be O(n) but we only track ancestors
# that are multi-tested, which should keep things closer to O(n) in real-
# world scenarios.

new_edges: List[Tuple[UniqueID, UniqueID]] = []
ready: deque = deque(source_nodes)
details = {node_id: SeenDetails(node_id) for node_id in source_nodes}

while len(ready) > 0:
curr_details: SeenDetails = details[ready.pop()]
test_ids = _get_tests_for_node(manifest, curr_details.node_id)
new_awaits_for_succs = curr_details.awaits_tests.copy()
for test_id in test_ids:
deps: List[UniqueID] = sorted(manifest.nodes[test_id].depends_on_nodes)
if len(deps) > 1:
# Tests with only one dep were already handled.
new_awaits_for_succs.add((test_id, tuple(deps)))

for succ_id in [
s for s in graph.successors(curr_details.node_id) if s in executable_nodes
]:
suc_details = details.get(succ_id, None)
if suc_details is None:
suc_details = SeenDetails(succ_id)
details[succ_id] = suc_details
suc_details.visits += 1
suc_details.awaits_tests.update(new_awaits_for_succs)
suc_details.ancestors.update(curr_details.ancestors)
if curr_details.node_id in multi_tested_nodes:
# Only track ancestry information for the set of nodes
# we will actually check against later.
suc_details.ancestors.add(curr_details.node_id)

if suc_details.visits == graph.in_degree(succ_id):
if len(suc_details.awaits_tests) > 0:
removes = set()
for awt in suc_details.awaits_tests:
if not any(True for a in awt[1] if a not in suc_details.ancestors):
removes.add(awt)
new_edges.append((awt[0], succ_id))

suc_details.awaits_tests.difference_update(removes)
ready.appendleft(succ_id)

# We are now done with the current node and all of its ancestors.
# Discard its details to save memory.
del details[curr_details.node_id]

return new_edges

def get_graph(self, manifest: Manifest) -> Graph:
self.link_graph(manifest)
return Graph(self.graph)
Expand Down

0 comments on commit afe25a9

Please sign in to comment.