From 7df04b0fe461dcad3c2abcb67004178efaf445ce Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Thu, 12 Dec 2024 15:28:34 +0000 Subject: [PATCH] Create a no-op exposure runner (#11082) --- .../Under the Hood-20241202-164715.yaml | 6 +++ core/dbt/artifacts/schemas/results.py | 2 + core/dbt/contracts/graph/nodes.py | 14 +++++- core/dbt/events/types.py | 4 +- core/dbt/graph/selector.py | 10 ++-- core/dbt/runners/__init__.py | 2 + core/dbt/runners/exposure_runner.py | 7 +++ core/dbt/runners/no_op_runner.py | 45 +++++++++++++++++ core/dbt/runners/saved_query_runner.py | 7 +++ core/dbt/task/build.py | 50 +++---------------- core/dbt/task/printer.py | 3 ++ core/dbt/task/runnable.py | 2 + .../adapter/concurrency/test_concurrency.py | 4 +- .../adapter/hooks/test_on_run_hooks.py | 12 +---- .../defer_state/test_run_results_state.py | 6 ++- tests/functional/exposures/test_exposures.py | 36 +++++++++++-- .../saved_queries/test_saved_query_build.py | 10 +++- tests/unit/task/test_build.py | 2 +- tests/unit/test_events.py | 4 +- 19 files changed, 153 insertions(+), 73 deletions(-) create mode 100644 .changes/unreleased/Under the Hood-20241202-164715.yaml create mode 100644 core/dbt/runners/__init__.py create mode 100644 core/dbt/runners/exposure_runner.py create mode 100644 core/dbt/runners/no_op_runner.py create mode 100644 core/dbt/runners/saved_query_runner.py diff --git a/.changes/unreleased/Under the Hood-20241202-164715.yaml b/.changes/unreleased/Under the Hood-20241202-164715.yaml new file mode 100644 index 00000000000..1eb0646f32d --- /dev/null +++ b/.changes/unreleased/Under the Hood-20241202-164715.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Create a no-op exposure runner +time: 2024-12-02T16:47:15.766574Z +custom: + Author: aranke + Issue: ' ' diff --git a/core/dbt/artifacts/schemas/results.py b/core/dbt/artifacts/schemas/results.py index dd455f309b8..9211d713310 100644 --- a/core/dbt/artifacts/schemas/results.py +++ b/core/dbt/artifacts/schemas/results.py @@ -64,6 +64,7 @@ class NodeStatus(StrEnum): PartialSuccess = "partial success" Pass = "pass" RuntimeErr = "runtime error" + NoOp = "no-op" class RunStatus(StrEnum): @@ -71,6 +72,7 @@ class RunStatus(StrEnum): Error = NodeStatus.Error Skipped = NodeStatus.Skipped PartialSuccess = NodeStatus.PartialSuccess + NoOp = NodeStatus.NoOp class TestStatus(StrEnum): diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index 8fc39c7621b..4bb70db5d9c 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -1378,7 +1378,7 @@ def group(self): @dataclass -class Exposure(GraphNode, ExposureResource): +class Exposure(NodeInfoMixin, GraphNode, ExposureResource): @property def depends_on_nodes(self): return self.depends_on.nodes @@ -1441,6 +1441,12 @@ def same_contents(self, old: Optional["Exposure"]) -> bool: def group(self): return None + def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): + dct = super().__post_serialize__(dct, context) + if "_event_status" in dct: + del dct["_event_status"] + return dct + # ==================================== # Metric node @@ -1659,6 +1665,12 @@ def same_contents(self, old: Optional["SavedQuery"]) -> bool: and True ) + def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): + dct = super().__post_serialize__(dct, context) + if "_event_status" in dct: + del dct["_event_status"] + return dct + # ==================================== # Patches diff --git a/core/dbt/events/types.py b/core/dbt/events/types.py index 5770a6518ad..daad51e7451 100644 --- a/core/dbt/events/types.py +++ b/core/dbt/events/types.py @@ -1937,7 +1937,9 @@ def code(self) -> str: return "Z023" def message(self) -> str: - stats_line = "Done. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} TOTAL={total}" + stats_line = ( + "Done. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} NO-OP={noop} TOTAL={total}" + ) return stats_line.format(**self.stats) diff --git a/core/dbt/graph/selector.py b/core/dbt/graph/selector.py index bf586851fb8..b9786b4ddeb 100644 --- a/core/dbt/graph/selector.py +++ b/core/dbt/graph/selector.py @@ -178,10 +178,12 @@ def _is_graph_member(self, unique_id: UniqueId) -> bool: elif unique_id in self.manifest.saved_queries: saved_query = self.manifest.saved_queries[unique_id] return saved_query.config.enabled - - node = self.manifest.nodes[unique_id] - - return node.config.enabled + elif unique_id in self.manifest.exposures: + exposure = self.manifest.exposures[unique_id] + return exposure.config.enabled + else: + node = self.manifest.nodes[unique_id] + return node.config.enabled def _is_empty_node(self, unique_id: UniqueId) -> bool: if unique_id in self.manifest.nodes: diff --git a/core/dbt/runners/__init__.py b/core/dbt/runners/__init__.py new file mode 100644 index 00000000000..78b38ac55ac --- /dev/null +++ b/core/dbt/runners/__init__.py @@ -0,0 +1,2 @@ +from .exposure_runner import ExposureRunner +from .saved_query_runner import SavedQueryRunner diff --git a/core/dbt/runners/exposure_runner.py b/core/dbt/runners/exposure_runner.py new file mode 100644 index 00000000000..8392106b920 --- /dev/null +++ b/core/dbt/runners/exposure_runner.py @@ -0,0 +1,7 @@ +from dbt.runners.no_op_runner import NoOpRunner + + +class ExposureRunner(NoOpRunner): + @property + def description(self) -> str: + return f"exposure {self.node.name}" diff --git a/core/dbt/runners/no_op_runner.py b/core/dbt/runners/no_op_runner.py new file mode 100644 index 00000000000..2789c1fa9a6 --- /dev/null +++ b/core/dbt/runners/no_op_runner.py @@ -0,0 +1,45 @@ +import threading + +from dbt.artifacts.schemas.results import RunStatus +from dbt.artifacts.schemas.run import RunResult +from dbt.contracts.graph.manifest import Manifest +from dbt.events.types import LogNodeNoOpResult +from dbt.task.base import BaseRunner +from dbt_common.events.functions import fire_event + + +class NoOpRunner(BaseRunner): + @property + def description(self) -> str: + raise NotImplementedError("description not implemented") + + def before_execute(self) -> None: + pass + + def compile(self, manifest: Manifest): + return self.node + + def after_execute(self, result) -> None: + fire_event( + LogNodeNoOpResult( + description=self.description, + index=self.node_index, + total=self.num_nodes, + node_info=self.node.node_info, + ) + ) + + def execute(self, compiled_node, manifest): + # no-op + return RunResult( + node=compiled_node, + status=RunStatus.NoOp, + timing=[], + thread_id=threading.current_thread().name, + execution_time=0, + message="NO-OP", + adapter_response={}, + failures=0, + batch_results=None, + agate_table=None, + ) diff --git a/core/dbt/runners/saved_query_runner.py b/core/dbt/runners/saved_query_runner.py new file mode 100644 index 00000000000..234aeeef970 --- /dev/null +++ b/core/dbt/runners/saved_query_runner.py @@ -0,0 +1,7 @@ +from dbt.runners.no_op_runner import NoOpRunner + + +class SavedQueryRunner(NoOpRunner): + @property + def description(self) -> str: + return f"saved query {self.node.name}" diff --git a/core/dbt/task/build.py b/core/dbt/task/build.py index ff68d976744..1fc0e390a59 100644 --- a/core/dbt/task/build.py +++ b/core/dbt/task/build.py @@ -1,18 +1,17 @@ -import threading from typing import Dict, List, Optional, Set, Type -from dbt.artifacts.schemas.results import NodeStatus, RunStatus +from dbt.artifacts.schemas.results import NodeStatus from dbt.artifacts.schemas.run import RunResult from dbt.cli.flags import Flags from dbt.config.runtime import RuntimeConfig from dbt.contracts.graph.manifest import Manifest -from dbt.events.types import LogNodeNoOpResult from dbt.exceptions import DbtInternalError from dbt.graph import Graph, GraphQueue, ResourceTypeSelector from dbt.node_types import NodeType +from dbt.runners import ExposureRunner as exposure_runner +from dbt.runners import SavedQueryRunner as saved_query_runner from dbt.task.base import BaseRunner, resource_types_from_args from dbt.task.run import MicrobatchModelRunner -from dbt_common.events.functions import fire_event from .run import ModelRunner as run_model_runner from .run import RunTask @@ -21,48 +20,10 @@ from .test import TestRunner as test_runner -class SavedQueryRunner(BaseRunner): - # Stub. No-op Runner for Saved Queries, which require MetricFlow for execution. - @property - def description(self) -> str: - return f"saved query {self.node.name}" - - def before_execute(self) -> None: - pass - - def compile(self, manifest: Manifest): - return self.node - - def after_execute(self, result) -> None: - fire_event( - LogNodeNoOpResult( - description=self.description, - index=self.node_index, - total=self.num_nodes, - node_info=self.node.node_info, - ) - ) - - def execute(self, compiled_node, manifest): - # no-op - return RunResult( - node=compiled_node, - status=RunStatus.Success, - timing=[], - thread_id=threading.current_thread().name, - execution_time=0, - message="NO-OP", - adapter_response={}, - failures=0, - batch_results=None, - agate_table=None, - ) - - class BuildTask(RunTask): """The Build task processes all assets of a given process and attempts to 'build' them in an opinionated fashion. Every resource type outlined in - RUNNER_MAP will be processed by the mapped runner class. + RUNNER_MAP will be processed by the mapped runners class. I.E. a resource of type Model is handled by the ModelRunner which is imported as run_model_runner.""" @@ -80,7 +41,8 @@ class BuildTask(RunTask): NodeType.Seed: seed_runner, NodeType.Test: test_runner, NodeType.Unit: test_runner, - NodeType.SavedQuery: SavedQueryRunner, + NodeType.SavedQuery: saved_query_runner, + NodeType.Exposure: exposure_runner, } ALL_RESOURCE_VALUES = frozenset({x for x in RUNNER_MAP.keys()}) diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index 58a39552450..01ccd75a586 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -48,6 +48,8 @@ def interpret_run_result(result) -> str: return "warn" elif result.status in (NodeStatus.Pass, NodeStatus.Success): return "pass" + elif result.status == NodeStatus.NoOp: + return "noop" else: raise RuntimeError(f"unhandled result {result}") @@ -58,6 +60,7 @@ def print_run_status_line(results) -> None: "skip": 0, "pass": 0, "warn": 0, + "noop": 0, "total": 0, } diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 55342cafbbc..20dd2d7879d 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -184,6 +184,8 @@ def _runtime_initialize(self): self._flattened_nodes.append(self.manifest.saved_queries[uid]) elif uid in self.manifest.unit_tests: self._flattened_nodes.append(self.manifest.unit_tests[uid]) + elif uid in self.manifest.exposures: + self._flattened_nodes.append(self.manifest.exposures[uid]) else: raise DbtInternalError( f"Node selection returned {uid}, expected a node, a source, or a unit test" diff --git a/tests/functional/adapter/concurrency/test_concurrency.py b/tests/functional/adapter/concurrency/test_concurrency.py index 65932f95ea7..41433bfb23e 100644 --- a/tests/functional/adapter/concurrency/test_concurrency.py +++ b/tests/functional/adapter/concurrency/test_concurrency.py @@ -303,7 +303,7 @@ def models(self): } -class TestConcurenncy(BaseConcurrency): +class TestConcurrency(BaseConcurrency): def test_concurrency(self, project): run_dbt(["seed", "--select", "seed"]) results = run_dbt(["run"], expect_pass=False) @@ -326,5 +326,3 @@ def test_concurrency(self, project): check_relations_equal(project.adapter, ["seed", "table_b"]) check_table_does_not_exist(project.adapter, "invalid") check_table_does_not_exist(project.adapter, "skip") - - assert "PASS=5 WARN=0 ERROR=1 SKIP=1 TOTAL=7" in output diff --git a/tests/functional/adapter/hooks/test_on_run_hooks.py b/tests/functional/adapter/hooks/test_on_run_hooks.py index 5547f570967..2bad3e0f45d 100644 --- a/tests/functional/adapter/hooks/test_on_run_hooks.py +++ b/tests/functional/adapter/hooks/test_on_run_hooks.py @@ -31,15 +31,11 @@ def models(self): "select * from {{ target.schema }}.my_end_table" } - @pytest.fixture(scope="class") - def log_counts(self): - return "PASS=2 WARN=0 ERROR=2 SKIP=1 TOTAL=5" - @pytest.fixture(scope="class") def my_model_run_status(self): return RunStatus.Error - def test_results(self, project, log_counts, my_model_run_status): + def test_results(self, project, my_model_run_status): results, log_output = run_dbt_and_capture(["run"], expect_pass=False) expected_results = [ @@ -64,7 +60,6 @@ def test_results(self, project, log_counts, my_model_run_status): timing_keys = [timing.name for timing in result.timing] assert timing_keys == ["compile", "execute"] - assert log_counts in log_output assert "4 project hooks, 1 view model" in log_output run_results = get_artifact(project.project_root, "target", "run_results.json") @@ -88,10 +83,6 @@ class Test__StartHookFail__FlagIsTrue__ModelSkipped(Test__StartHookFail__FlagIsN def flags(self): return {"skip_nodes_if_on_run_start_fails": True} - @pytest.fixture(scope="class") - def log_counts(self): - return "PASS=2 WARN=0 ERROR=1 SKIP=2 TOTAL=5" - @pytest.fixture(scope="class") def my_model_run_status(self): return RunStatus.Skipped @@ -125,7 +116,6 @@ def test_results(self, project): ] assert [(result.node.unique_id, result.status) for result in results] == expected_results - assert "PASS=3 WARN=0 ERROR=1 SKIP=1 TOTAL=5" in log_output assert "4 project hooks, 1 view model" in log_output run_results = get_artifact(project.project_root, "target", "run_results.json") diff --git a/tests/functional/defer_state/test_run_results_state.py b/tests/functional/defer_state/test_run_results_state.py index e4b467d8e37..6d7e5ae3d61 100644 --- a/tests/functional/defer_state/test_run_results_state.py +++ b/tests/functional/defer_state/test_run_results_state.py @@ -179,13 +179,14 @@ def test_build_run_results_state(self, project): results = run_dbt( ["build", "--select", "result:error+", "--state", "./state"], expect_pass=False ) - assert len(results) == 4 + assert len(results) == 5 nodes = set([elem.node.name for elem in results]) assert nodes == { "table_model", "view_model", "not_null_view_model_id", "unique_view_model_id", + "my_exposure", } results = run_dbt(["ls", "--select", "result:error+", "--state", "./state"]) @@ -443,7 +444,7 @@ def test_concurrent_selectors_build_run_results_state(self, project): ["build", "--select", "state:modified+", "result:error+", "--state", "./state"], expect_pass=False, ) - assert len(results) == 5 + assert len(results) == 6 nodes = set([elem.node.name for elem in results]) assert nodes == { "table_model_modified_example", @@ -451,6 +452,7 @@ def test_concurrent_selectors_build_run_results_state(self, project): "table_model", "not_null_view_model_id", "unique_view_model_id", + "my_exposure", } self.update_view_model_failing_tests() diff --git a/tests/functional/exposures/test_exposures.py b/tests/functional/exposures/test_exposures.py index be42ffd26c0..efe3e492a83 100644 --- a/tests/functional/exposures/test_exposures.py +++ b/tests/functional/exposures/test_exposures.py @@ -1,5 +1,7 @@ import pytest +from dbt.artifacts.schemas.results import RunStatus +from dbt.contracts.graph.nodes import Exposure from dbt.tests.util import get_manifest, run_dbt from tests.functional.exposures.fixtures import ( metricflow_time_spine_sql, @@ -25,8 +27,8 @@ def models(self): "metrics.yml": metrics_schema_yml, } - def test_names_with_spaces(self, project): - run_dbt(["run"]) + def test_compilation_names_with_spaces(self, project): + run_dbt(["compile"]) manifest = get_manifest(project.project_root) exposure_ids = list(manifest.exposures.keys()) expected_exposure_ids = [ @@ -36,8 +38,8 @@ def test_names_with_spaces(self, project): assert exposure_ids == expected_exposure_ids assert manifest.exposures["exposure.test.simple_exposure"].label == "simple exposure label" - def test_depends_on(self, project): - run_dbt(["run"]) + def test_compilation_depends_on(self, project): + run_dbt(["compile"]) manifest = get_manifest(project.project_root) exposure_depends_on = manifest.exposures["exposure.test.simple_exposure"].depends_on.nodes expected_exposure_depends_on = [ @@ -46,3 +48,29 @@ def test_depends_on(self, project): "metric.test.metric", ] assert sorted(exposure_depends_on) == sorted(expected_exposure_depends_on) + + def test_execution_default(self, project): + results = run_dbt(["build"]) + exposure_results = ( + result for result in results.results if isinstance(result.node, Exposure) + ) + assert {result.node.name for result in exposure_results} == { + "simple_exposure", + "notebook_exposure", + } + assert all(result.status == RunStatus.NoOp for result in exposure_results) + assert all("NO-OP" in result.message for result in exposure_results) + + def test_execution_exclude(self, project): + results = run_dbt(["build", "--exclude", "simple_exposure"]) + exposure_results = ( + result for result in results.results if isinstance(result.node, Exposure) + ) + assert {result.node.name for result in exposure_results} == {"notebook_exposure"} + + def test_execution_select(self, project): + results = run_dbt(["build", "--select", "simple_exposure"]) + exposure_results = ( + result for result in results.results if isinstance(result.node, Exposure) + ) + assert {result.node.name for result in exposure_results} == {"simple_exposure"} diff --git a/tests/functional/saved_queries/test_saved_query_build.py b/tests/functional/saved_queries/test_saved_query_build.py index e9c2bbda3f8..1e976eb9e76 100644 --- a/tests/functional/saved_queries/test_saved_query_build.py +++ b/tests/functional/saved_queries/test_saved_query_build.py @@ -1,5 +1,7 @@ import pytest +from dbt.artifacts.schemas.results import RunStatus +from dbt.contracts.graph.nodes import SavedQuery from dbt.tests.util import run_dbt from tests.functional.saved_queries.fixtures import ( saved_queries_yml, @@ -36,4 +38,10 @@ def test_build_saved_queries_no_op(self, project) -> None: run_dbt(["deps"]) result = run_dbt(["build"]) assert len(result.results) == 3 - assert "NO-OP" in [r.message for r in result.results] + + saved_query_results = ( + result for result in result.results if isinstance(result.node, SavedQuery) + ) + assert {result.node.name for result in saved_query_results} == {"test_saved_query"} + assert all("NO-OP" in result.message for result in saved_query_results) + assert all(result.status == RunStatus.NoOp for result in saved_query_results) diff --git a/tests/unit/task/test_build.py b/tests/unit/task/test_build.py index 87d7b081ae8..c4fb235ae0e 100644 --- a/tests/unit/task/test_build.py +++ b/tests/unit/task/test_build.py @@ -1,5 +1,5 @@ from dbt.contracts.graph.nodes import SavedQuery -from dbt.task.build import SavedQueryRunner +from dbt.runners import SavedQueryRunner def test_saved_query_runner_on_skip(saved_query: SavedQuery): diff --git a/tests/unit/test_events.py b/tests/unit/test_events.py index 3d8dfb76916..de078c45494 100644 --- a/tests/unit/test_events.py +++ b/tests/unit/test_events.py @@ -453,7 +453,9 @@ def test_event_codes(self): core_types.OpenCommand(open_cmd="", profiles_dir=""), core_types.RunResultWarning(resource_type="", node_name="", path=""), core_types.RunResultFailure(resource_type="", node_name="", path=""), - core_types.StatsLine(stats={"error": 0, "skip": 0, "pass": 0, "warn": 0, "total": 0}), + core_types.StatsLine( + stats={"error": 0, "skip": 0, "pass": 0, "warn": 0, "noop": 0, "total": 0} + ), core_types.RunResultError(msg=""), core_types.RunResultErrorNoMessage(status=""), core_types.SQLCompiledPath(path=""),