Skip to content

Commit

Permalink
Interface for Execution queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenyuLInx committed Aug 10, 2024
1 parent 63262e9 commit db0535f
Showing 1 changed file with 81 additions and 0 deletions.
81 changes: 81 additions & 0 deletions core/dbt/graph/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@

import networkx as nx # type: ignore

from dbt.artifacts.schemas.run import RunResult
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import (
Exposure,
GraphMemberNode,
Metric,
ModelNode,
SourceDefinition,
)
from dbt.contracts.state import PreviousState
from dbt.graph.selector_spec import SelectionSpec
from dbt.node_types import NodeType

from .graph import UniqueId
Expand Down Expand Up @@ -212,3 +216,80 @@ def wait_until_something_was_done(self) -> int:
with self.lock:
self.some_task_done.wait()
return self.inner.unfinished_tasks


class ExecutionQueue:
"""
ExecutionQueue manage what nodes to execute in what order, based on the supplied inputs.
It is responsible for managing the queue of nodes to execute, and for marking nodes as
done when they have been executed.
"""

def __init__(
self,
manifest: Manifest,
previous_state: PreviousState,
resource_types: List[NodeType],
include_empty_nodes: Optional[bool] = False,
selection_spec: Optional[SelectionSpec] = None,
fail_fast: Optional[bool] = False,
) -> None:
"""Create a new ExecutionQueue.
Nodes to execute are selected based on the manifest, previous state, selection spec, inlcude_empty_nodes, and resource_types.
See Args for more details.
Example usage:
pool = ThreadPool(4)
queue = ExecutionQueue(manifest, previous_state, [NodeType.Model, NodeType.Test])
def callback(result: RunResult):
queue.handle_node_result(result)
def run(node: GraphMemberNode):
result = node.run()
return result
while queue.count() > 0:
node = queue.get()
pool.apply_async(run, args=(node), callback=callback)
results = queue.join()
Args:
manifest (Manifest): the manifest of the project
previous_state (PreviousState): the previous state of the project, used in state selection.
resource_types (List[NodeType]): the types of resources to include in the selection.
include_empty_nodes (Optional[bool]): whether to include nodes that do not have values in the selection. Defaults to False.
selection_spec (Optional[SelectionSpec]): the selection spec to use. Defaults to None
fail_fast (Optional[bool]): when set to True, the will will stop execution after the first error. Defaults to False.
"""
pass

Check warning on line 263 in core/dbt/graph/queue.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/graph/queue.py#L263

Added line #L263 was not covered by tests

def count(self) -> int:
"""
Returns:
int: the number of nodes in the queue (excluding in-progress nodes)
"""
return 0

Check warning on line 270 in core/dbt/graph/queue.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/graph/queue.py#L270

Added line #L270 was not covered by tests

def handle_node_result(self, result: RunResult) -> None:
"""Given a RunResult, mark the node as done and update the queue to make more nodes avaliable.
Args:
result (RunResult): _description_
"""
pass

Check warning on line 278 in core/dbt/graph/queue.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/graph/queue.py#L278

Added line #L278 was not covered by tests

def get(self, block: bool = True) -> GraphMemberNode:
"""
Get the next node to execute.
Args:
block (bool, optional): whether to block until a node is available. Defaults to True.
"""
return ModelNode() # type: ignore

Check warning on line 287 in core/dbt/graph/queue.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/graph/queue.py#L287

Added line #L287 was not covered by tests

def join(self) -> list[RunResult]:
"""Wait for all nodes to finish executing, and return the results of all nodes.
Returns:
list[RunResult]: the results of all nodes.
"""
return []

Check warning on line 295 in core/dbt/graph/queue.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/graph/queue.py#L295

Added line #L295 was not covered by tests

0 comments on commit db0535f

Please sign in to comment.