Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maximally parallelize dbt clone #10129

Merged
merged 6 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20240522-000309.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Maximally parallelize dbt clone
in clone command"
time: 2024-05-22T00:03:09.765977-04:00
custom:
Author: michelleark
Issue: "7914"
10 changes: 8 additions & 2 deletions core/dbt/graph/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@ class GraphQueue:
the same time, as there is an unlocked race!
"""

def __init__(self, graph: nx.DiGraph, manifest: Manifest, selected: Set[UniqueId]) -> None:
self.graph = graph
def __init__(
self,
graph: nx.DiGraph,
manifest: Manifest,
selected: Set[UniqueId],
preserve_edges: bool = True,
) -> None:
self.graph = graph if preserve_edges else nx.classes.function.create_empty_copy(graph)
MichelleArk marked this conversation as resolved.
Show resolved Hide resolved
self.manifest = manifest
self._selected = selected
# store the queue as a priority queue.
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/graph/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def get_selected(self, spec: SelectionSpec) -> Set[UniqueId]:

return filtered_nodes

def get_graph_queue(self, spec: SelectionSpec) -> GraphQueue:
def get_graph_queue(self, spec: SelectionSpec, preserve_edges: bool = True) -> GraphQueue:
"""Returns a queue over nodes in the graph that tracks progress of
dependecies.
"""
Expand All @@ -330,7 +330,7 @@ def get_graph_queue(self, spec: SelectionSpec) -> GraphQueue:
# Construct a new graph using the selected_nodes
new_graph = self.full_graph.get_subset_graph(selected_nodes)
# should we give a way here for consumers to mutate the graph?
return GraphQueue(new_graph.graph, self.manifest, selected_nodes)
return GraphQueue(new_graph.graph, self.manifest, selected_nodes, preserve_edges)


class ResourceTypeSelector(NodeSelector):
Expand Down
5 changes: 4 additions & 1 deletion core/dbt/task/clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dbt.node_types import REFABLE_NODE_TYPES
from dbt.task.base import BaseRunner, resource_types_from_args
from dbt.task.run import _validate_materialization_relations_dict
from dbt.task.runnable import GraphRunnableTask
from dbt.task.runnable import GraphRunnableMode, GraphRunnableTask
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.exceptions import CompilationError, DbtInternalError

Expand Down Expand Up @@ -94,6 +94,9 @@ class CloneTask(GraphRunnableTask):
def raise_on_first_error(self):
return False

def get_run_mode(self) -> GraphRunnableMode:
return GraphRunnableMode.Independent

def _get_deferred_manifest(self) -> Optional[Manifest]:
# Unlike other commands, 'clone' always requires a state manifest
# Load previous state, regardless of whether --defer flag has been set
Expand Down
16 changes: 15 additions & 1 deletion core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from dbt.parser.manifest import write_manifest
from dbt.task.base import BaseRunner, ConfiguredTask
from dbt_common.context import _INVOCATION_CONTEXT_VAR, get_invocation_context
from dbt_common.dataclass_schema import StrEnum
from dbt_common.events.contextvars import log_contextvars, task_contextvars
from dbt_common.events.functions import fire_event, warn_or_error
from dbt_common.events.types import Formatting
Expand All @@ -68,6 +69,11 @@
RUNNING_STATE = DbtProcessState("running")


class GraphRunnableMode(StrEnum):
Topological = "topological"
Independent = "independent"


class GraphRunnableTask(ConfiguredTask):
MARK_DEPENDENT_ERRORS_STATUSES = [NodeStatus.Error]

Expand Down Expand Up @@ -145,7 +151,15 @@
selector = self.get_node_selector()
# Following uses self.selection_arg and self.exclusion_arg
spec = self.get_selection_spec()
return selector.get_graph_queue(spec)

preserve_edges = True
if self.get_run_mode() == GraphRunnableMode.Independent:
preserve_edges = False

Check warning on line 157 in core/dbt/task/runnable.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/runnable.py#L157

Added line #L157 was not covered by tests

return selector.get_graph_queue(spec, preserve_edges)

def get_run_mode(self) -> GraphRunnableMode:
return GraphRunnableMode.Topological

def _runtime_initialize(self):
self.compile_manifest()
Expand Down
15 changes: 1 addition & 14 deletions tests/unit/contracts/graph/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
MockNode,
MockSource,
inject_plugin,
make_manifest,
)

REQUIRED_PARSED_NODE_KEYS = frozenset(
Expand Down Expand Up @@ -1090,20 +1091,6 @@ def setUp(self):
)


def make_manifest(nodes=[], sources=[], macros=[], docs=[]):
return Manifest(
nodes={n.unique_id: n for n in nodes},
macros={m.unique_id: m for m in macros},
sources={s.unique_id: s for s in sources},
docs={d.unique_id: d for d in docs},
disabled={},
files={},
exposures={},
metrics={},
selectors={},
)


FindMacroSpec = namedtuple("FindMacroSpec", "macros,expected")

macro_parameter_sets = [
Expand Down
47 changes: 47 additions & 0 deletions tests/unit/graph/test_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import networkx as nx
import pytest

from dbt.contracts.graph.manifest import Manifest
from dbt.graph.queue import GraphQueue
from tests.unit.utils import MockNode, make_manifest


class TestGraphQueue:
@pytest.fixture(scope="class")
def manifest(self) -> Manifest:
return make_manifest(
nodes=[
MockNode(package="test_package", name="upstream_model"),
MockNode(package="test_package", name="downstream_model"),
]
)

@pytest.fixture(scope="class")
def graph(self) -> nx.DiGraph:
graph = nx.DiGraph()
graph.add_edge("model.test_package.upstream_model", "model.test_package.downstream_model")
return graph

def test_init_graph_queue(self, manifest, graph):
graph_queue = GraphQueue(graph=graph, manifest=manifest, selected={})

assert graph_queue.manifest == manifest
assert graph_queue.graph == graph
assert graph_queue.inner.queue == [(0, "model.test_package.upstream_model")]
assert graph_queue.in_progress == set()
assert graph_queue.queued == {"model.test_package.upstream_model"}
assert graph_queue.lock

def test_init_graph_queue_preserve_edges_false(self, manifest, graph):
graph_queue = GraphQueue(graph=graph, manifest=manifest, selected={}, preserve_edges=False)

# when preserve_edges is set to false, dependencies between nodes are no longer tracked in the priority queue
assert list(graph_queue.graph.edges) == []
assert graph_queue.inner.queue == [
(0, "model.test_package.downstream_model"),
(0, "model.test_package.upstream_model"),
]
assert graph_queue.queued == {
"model.test_package.upstream_model",
"model.test_package.downstream_model",
}
76 changes: 72 additions & 4 deletions tests/unit/task/test_runnable.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from dataclasses import dataclass
from typing import AbstractSet, Any, Dict, Optional
from typing import AbstractSet, Any, Dict, List, Optional, Tuple

import networkx as nx
import pytest

from dbt.task.runnable import GraphRunnableTask
from dbt.artifacts.resources.types import NodeType
from dbt.graph import Graph, ResourceTypeSelector
from dbt.task.runnable import GraphRunnableMode, GraphRunnableTask
from dbt.tests.util import safe_set_invocation_context
from tests.unit.utils import MockNode, make_manifest


@dataclass
Expand All @@ -14,6 +18,9 @@ class MockArgs:
state: Optional[Dict[str, Any]] = None
defer_state: Optional[Dict[str, Any]] = None
write_json: bool = False
selector: Optional[str] = None
select: Tuple[str] = ()
exclude: Tuple[str] = ()


@dataclass
Expand All @@ -23,12 +30,28 @@ class MockConfig:
threads: int = 1
target_name: str = "mock_config_target_name"

def get_default_selector_name(self):
return None


class MockRunnableTask(GraphRunnableTask):
def __init__(self, exception_class: Exception = Exception):
def __init__(
self,
exception_class: Exception = Exception,
nodes: Optional[List[MockNode]] = None,
edges: Optional[List[Tuple[str, str]]] = None,
):
nodes = nodes or []
edges = edges or []

self.forced_exception_class = exception_class
self.did_cancel: bool = False
super().__init__(args=MockArgs(), config=MockConfig(), manifest=None)
self.manifest = make_manifest(nodes=nodes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like we are reimplementing the task logic in the test again. I view unit test as documenting behavior in this case, so the only thing we need to test at the runnable task is: are we using the correct arguments used when calling get_graph_queue?

digraph = nx.DiGraph()
for edge in edges:
digraph.add_edge(edge[0], edge[1])
self.graph = Graph(digraph)

def run_queue(self, pool):
"""Override `run_queue` to raise a system exit"""
Expand All @@ -40,13 +63,25 @@ def _cancel_connections(self, pool):

def get_node_selector(self):
"""This is an `abstract_method` on `GraphRunnableTask`, thus we must implement it"""
return None
selector = ResourceTypeSelector(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=[NodeType.Model],
include_empty_nodes=True,
)
return selector

def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]):
"""This is an `abstract_method` on `GraphRunnableTask`, thus we must implement it"""
return None


class MockRunnableTaskIndependent(MockRunnableTask):
def get_run_mode(self) -> GraphRunnableMode:
return GraphRunnableMode.Independent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are using inheritance where we should be using patch



def test_graph_runnable_task_cancels_connection_on_system_exit():

safe_set_invocation_context()
Expand Down Expand Up @@ -81,3 +116,36 @@ def test_graph_runnable_task_doesnt_cancel_connection_on_generic_exception():

# If `did_cancel` is True, that means `_cancel_connections` was called
assert task.did_cancel is False


def test_graph_runnable_preserves_edges_by_default():
task = MockRunnableTask(
nodes=[
MockNode("test", "upstream_node", fqn="model.test.upstream_node"),
MockNode("test", "downstream_node", fqn="model.test.downstream_node"),
],
edges=[("model.test.upstream_node", "model.test.downstream_node")],
)
assert task.get_run_mode() == GraphRunnableMode.Topological
graph_queue = task.get_graph_queue()

assert graph_queue.queued == {"model.test.upstream_node"}
assert graph_queue.inner.queue == [(0, "model.test.upstream_node")]


def test_graph_runnable_preserves_edges_false():
task = MockRunnableTaskIndependent(
nodes=[
MockNode("test", "upstream_node", fqn="model.test.upstream_node"),
MockNode("test", "downstream_node", fqn="model.test.downstream_node"),
],
edges=[("model.test.upstream_node", "model.test.downstream_node")],
)
assert task.get_run_mode() == GraphRunnableMode.Independent
graph_queue = task.get_graph_queue()

assert graph_queue.queued == {"model.test.downstream_node", "model.test.upstream_node"}
assert graph_queue.inner.queue == [
(0, "model.test.downstream_node"),
(0, "model.test.upstream_node"),
]
15 changes: 15 additions & 0 deletions tests/unit/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pytest

from dbt.config.project import PartialProject
from dbt.contracts.graph.manifest import Manifest
from dbt_common.dataclass_schema import ValidationError


Expand Down Expand Up @@ -387,3 +388,17 @@ def replace_config(n, **kwargs):
config=n.config.replace(**kwargs),
unrendered_config=dict_replace(n.unrendered_config, **kwargs),
)


def make_manifest(nodes=[], sources=[], macros=[], docs=[]) -> Manifest:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is duplicating

here, thoughts on how can we make that one easier to use/more visible?

return Manifest(
nodes={n.unique_id: n for n in nodes},
macros={m.unique_id: m for m in macros},
sources={s.unique_id: s for s in sources},
docs={d.unique_id: d for d in docs},
disabled={},
files={},
exposures={},
metrics={},
selectors={},
)
Loading