Skip to content

Commit

Permalink
Create a no-op exposure runner (#11082)
Browse files Browse the repository at this point in the history
  • Loading branch information
aranke authored Dec 12, 2024
1 parent 6621015 commit 7df04b0
Show file tree
Hide file tree
Showing 19 changed files with 153 additions and 73 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20241202-164715.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Create a no-op exposure runner
time: 2024-12-02T16:47:15.766574Z
custom:
Author: aranke
Issue: ' '
2 changes: 2 additions & 0 deletions core/dbt/artifacts/schemas/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ class NodeStatus(StrEnum):
PartialSuccess = "partial success"
Pass = "pass"
RuntimeErr = "runtime error"
NoOp = "no-op"


class RunStatus(StrEnum):
Success = NodeStatus.Success
Error = NodeStatus.Error
Skipped = NodeStatus.Skipped
PartialSuccess = NodeStatus.PartialSuccess
NoOp = NodeStatus.NoOp


class TestStatus(StrEnum):
Expand Down
14 changes: 13 additions & 1 deletion core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,7 @@ def group(self):


@dataclass
class Exposure(GraphNode, ExposureResource):
class Exposure(NodeInfoMixin, GraphNode, ExposureResource):
@property
def depends_on_nodes(self):
return self.depends_on.nodes
Expand Down Expand Up @@ -1441,6 +1441,12 @@ def same_contents(self, old: Optional["Exposure"]) -> bool:
def group(self):
return None

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if "_event_status" in dct:
del dct["_event_status"]
return dct


# ====================================
# Metric node
Expand Down Expand Up @@ -1659,6 +1665,12 @@ def same_contents(self, old: Optional["SavedQuery"]) -> bool:
and True
)

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if "_event_status" in dct:
del dct["_event_status"]
return dct


# ====================================
# Patches
Expand Down
4 changes: 3 additions & 1 deletion core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1937,7 +1937,9 @@ def code(self) -> str:
return "Z023"

def message(self) -> str:
stats_line = "Done. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} TOTAL={total}"
stats_line = (
"Done. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} NO-OP={noop} TOTAL={total}"
)
return stats_line.format(**self.stats)


Expand Down
10 changes: 6 additions & 4 deletions core/dbt/graph/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,12 @@ def _is_graph_member(self, unique_id: UniqueId) -> bool:
elif unique_id in self.manifest.saved_queries:
saved_query = self.manifest.saved_queries[unique_id]
return saved_query.config.enabled

node = self.manifest.nodes[unique_id]

return node.config.enabled
elif unique_id in self.manifest.exposures:
exposure = self.manifest.exposures[unique_id]
return exposure.config.enabled
else:
node = self.manifest.nodes[unique_id]
return node.config.enabled

def _is_empty_node(self, unique_id: UniqueId) -> bool:
if unique_id in self.manifest.nodes:
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/runners/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .exposure_runner import ExposureRunner
from .saved_query_runner import SavedQueryRunner
7 changes: 7 additions & 0 deletions core/dbt/runners/exposure_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dbt.runners.no_op_runner import NoOpRunner


class ExposureRunner(NoOpRunner):
@property
def description(self) -> str:
return f"exposure {self.node.name}"
45 changes: 45 additions & 0 deletions core/dbt/runners/no_op_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import threading

from dbt.artifacts.schemas.results import RunStatus
from dbt.artifacts.schemas.run import RunResult
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import LogNodeNoOpResult
from dbt.task.base import BaseRunner
from dbt_common.events.functions import fire_event


class NoOpRunner(BaseRunner):
@property
def description(self) -> str:
raise NotImplementedError("description not implemented")

def before_execute(self) -> None:
pass

def compile(self, manifest: Manifest):
return self.node

def after_execute(self, result) -> None:
fire_event(
LogNodeNoOpResult(
description=self.description,
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info,
)
)

def execute(self, compiled_node, manifest):
# no-op
return RunResult(
node=compiled_node,
status=RunStatus.NoOp,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message="NO-OP",
adapter_response={},
failures=0,
batch_results=None,
agate_table=None,
)
7 changes: 7 additions & 0 deletions core/dbt/runners/saved_query_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dbt.runners.no_op_runner import NoOpRunner


class SavedQueryRunner(NoOpRunner):
@property
def description(self) -> str:
return f"saved query {self.node.name}"
50 changes: 6 additions & 44 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import threading
from typing import Dict, List, Optional, Set, Type

from dbt.artifacts.schemas.results import NodeStatus, RunStatus
from dbt.artifacts.schemas.results import NodeStatus
from dbt.artifacts.schemas.run import RunResult
from dbt.cli.flags import Flags
from dbt.config.runtime import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import LogNodeNoOpResult
from dbt.exceptions import DbtInternalError
from dbt.graph import Graph, GraphQueue, ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.runners import ExposureRunner as exposure_runner
from dbt.runners import SavedQueryRunner as saved_query_runner
from dbt.task.base import BaseRunner, resource_types_from_args
from dbt.task.run import MicrobatchModelRunner
from dbt_common.events.functions import fire_event

from .run import ModelRunner as run_model_runner
from .run import RunTask
Expand All @@ -21,48 +20,10 @@
from .test import TestRunner as test_runner


class SavedQueryRunner(BaseRunner):
# Stub. No-op Runner for Saved Queries, which require MetricFlow for execution.
@property
def description(self) -> str:
return f"saved query {self.node.name}"

def before_execute(self) -> None:
pass

def compile(self, manifest: Manifest):
return self.node

def after_execute(self, result) -> None:
fire_event(
LogNodeNoOpResult(
description=self.description,
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info,
)
)

def execute(self, compiled_node, manifest):
# no-op
return RunResult(
node=compiled_node,
status=RunStatus.Success,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message="NO-OP",
adapter_response={},
failures=0,
batch_results=None,
agate_table=None,
)


class BuildTask(RunTask):
"""The Build task processes all assets of a given process and attempts to
'build' them in an opinionated fashion. Every resource type outlined in
RUNNER_MAP will be processed by the mapped runner class.
RUNNER_MAP will be processed by the mapped runners class.
I.E. a resource of type Model is handled by the ModelRunner which is
imported as run_model_runner."""
Expand All @@ -80,7 +41,8 @@ class BuildTask(RunTask):
NodeType.Seed: seed_runner,
NodeType.Test: test_runner,
NodeType.Unit: test_runner,
NodeType.SavedQuery: SavedQueryRunner,
NodeType.SavedQuery: saved_query_runner,
NodeType.Exposure: exposure_runner,
}
ALL_RESOURCE_VALUES = frozenset({x for x in RUNNER_MAP.keys()})

Expand Down
3 changes: 3 additions & 0 deletions core/dbt/task/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def interpret_run_result(result) -> str:
return "warn"
elif result.status in (NodeStatus.Pass, NodeStatus.Success):
return "pass"
elif result.status == NodeStatus.NoOp:
return "noop"
else:
raise RuntimeError(f"unhandled result {result}")

Expand All @@ -58,6 +60,7 @@ def print_run_status_line(results) -> None:
"skip": 0,
"pass": 0,
"warn": 0,
"noop": 0,
"total": 0,
}

Expand Down
2 changes: 2 additions & 0 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ def _runtime_initialize(self):
self._flattened_nodes.append(self.manifest.saved_queries[uid])
elif uid in self.manifest.unit_tests:
self._flattened_nodes.append(self.manifest.unit_tests[uid])
elif uid in self.manifest.exposures:
self._flattened_nodes.append(self.manifest.exposures[uid])
else:
raise DbtInternalError(
f"Node selection returned {uid}, expected a node, a source, or a unit test"
Expand Down
4 changes: 1 addition & 3 deletions tests/functional/adapter/concurrency/test_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def models(self):
}


class TestConcurenncy(BaseConcurrency):
class TestConcurrency(BaseConcurrency):
def test_concurrency(self, project):
run_dbt(["seed", "--select", "seed"])
results = run_dbt(["run"], expect_pass=False)
Expand All @@ -326,5 +326,3 @@ def test_concurrency(self, project):
check_relations_equal(project.adapter, ["seed", "table_b"])
check_table_does_not_exist(project.adapter, "invalid")
check_table_does_not_exist(project.adapter, "skip")

assert "PASS=5 WARN=0 ERROR=1 SKIP=1 TOTAL=7" in output
12 changes: 1 addition & 11 deletions tests/functional/adapter/hooks/test_on_run_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,11 @@ def models(self):
"select * from {{ target.schema }}.my_end_table"
}

@pytest.fixture(scope="class")
def log_counts(self):
return "PASS=2 WARN=0 ERROR=2 SKIP=1 TOTAL=5"

@pytest.fixture(scope="class")
def my_model_run_status(self):
return RunStatus.Error

def test_results(self, project, log_counts, my_model_run_status):
def test_results(self, project, my_model_run_status):
results, log_output = run_dbt_and_capture(["run"], expect_pass=False)

expected_results = [
Expand All @@ -64,7 +60,6 @@ def test_results(self, project, log_counts, my_model_run_status):
timing_keys = [timing.name for timing in result.timing]
assert timing_keys == ["compile", "execute"]

assert log_counts in log_output
assert "4 project hooks, 1 view model" in log_output

run_results = get_artifact(project.project_root, "target", "run_results.json")
Expand All @@ -88,10 +83,6 @@ class Test__StartHookFail__FlagIsTrue__ModelSkipped(Test__StartHookFail__FlagIsN
def flags(self):
return {"skip_nodes_if_on_run_start_fails": True}

@pytest.fixture(scope="class")
def log_counts(self):
return "PASS=2 WARN=0 ERROR=1 SKIP=2 TOTAL=5"

@pytest.fixture(scope="class")
def my_model_run_status(self):
return RunStatus.Skipped
Expand Down Expand Up @@ -125,7 +116,6 @@ def test_results(self, project):
]

assert [(result.node.unique_id, result.status) for result in results] == expected_results
assert "PASS=3 WARN=0 ERROR=1 SKIP=1 TOTAL=5" in log_output
assert "4 project hooks, 1 view model" in log_output

run_results = get_artifact(project.project_root, "target", "run_results.json")
Expand Down
6 changes: 4 additions & 2 deletions tests/functional/defer_state/test_run_results_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,14 @@ def test_build_run_results_state(self, project):
results = run_dbt(
["build", "--select", "result:error+", "--state", "./state"], expect_pass=False
)
assert len(results) == 4
assert len(results) == 5
nodes = set([elem.node.name for elem in results])
assert nodes == {
"table_model",
"view_model",
"not_null_view_model_id",
"unique_view_model_id",
"my_exposure",
}

results = run_dbt(["ls", "--select", "result:error+", "--state", "./state"])
Expand Down Expand Up @@ -443,14 +444,15 @@ def test_concurrent_selectors_build_run_results_state(self, project):
["build", "--select", "state:modified+", "result:error+", "--state", "./state"],
expect_pass=False,
)
assert len(results) == 5
assert len(results) == 6
nodes = set([elem.node.name for elem in results])
assert nodes == {
"table_model_modified_example",
"view_model",
"table_model",
"not_null_view_model_id",
"unique_view_model_id",
"my_exposure",
}

self.update_view_model_failing_tests()
Expand Down
Loading

0 comments on commit 7df04b0

Please sign in to comment.