Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a no-op exposure runner #11082

Merged
merged 12 commits into from
Dec 12, 2024
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20241202-164715.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Create a no-op exposure runner
time: 2024-12-02T16:47:15.766574Z
custom:
Author: aranke
Issue: ' '
2 changes: 2 additions & 0 deletions core/dbt/artifacts/schemas/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ class NodeStatus(StrEnum):
PartialSuccess = "partial success"
Pass = "pass"
RuntimeErr = "runtime error"
NoOp = "no-op"


class RunStatus(StrEnum):
Success = NodeStatus.Success
Error = NodeStatus.Error
Skipped = NodeStatus.Skipped
PartialSuccess = NodeStatus.PartialSuccess
NoOp = NodeStatus.NoOp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aranke For the purposes of dbt retry, should NoOp be included in RETRYABLE_STATUSES?

Put differently: If my selection criteria includes a set of saved queries and --no-export-saved-queries (docs), those saved queries will all have no-op statuses in run_results.json. If I rerun with dbt retry --export-saved-queries, should dbt reprocess those saved queries, this time with triggers? (The same thing will go for active exposure.)

In order to make that happen, I think we'd also need to include --export-saved-queries (and the analogous config for exposures) in ALLOW_CLI_OVERRIDE_FLAGS.

My first intuition here was: Because NoOp is not a failure (or a Skip due to an upstream failure), it should not be retryable. But it also is not Success.

  • The benefit of making NoOp retryable (and adding those CLI flags) is a slightly more ergonomic mechanism to quickly "heal" a run that accidentally missed including that config.
  • The harm of including them is extra noise in the logs (dbt is retrying more things than expected, only to no-op most of them).

When we discussed this the other week, I think we landed on including RunStatus.NoOp in the retryable statuses — but after reasoning through this above, I don't think we should include it (= leave the code as it is)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this assessment because:

  1. If an exposure has a failure/skipped status, it will be retried automatically (I will add a test for this case).
  2. Its result won’t change unless the project config also changes (unlikely IME).

This is also a change we can implement fairly easily if we see a need for it, so I’ll merge the code in as-is for now.



class TestStatus(StrEnum):
Expand Down
14 changes: 13 additions & 1 deletion core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,7 @@ def group(self):


@dataclass
class Exposure(GraphNode, ExposureResource):
class Exposure(NodeInfoMixin, GraphNode, ExposureResource):
@property
def depends_on_nodes(self):
return self.depends_on.nodes
Expand Down Expand Up @@ -1441,6 +1441,12 @@ def same_contents(self, old: Optional["Exposure"]) -> bool:
def group(self):
return None

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if "_event_status" in dct:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this field is never written out.
Fixed the failing test to account for this field instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't implying this was unnecessary, just moreso trying to understand what added _event_status and why the change was made.

Digging into this I see its because of the NodeInfoMixin inheritance change to Exposure in this PR. I think we should do what is consistent with other nodes that inherit from NodeInfoMixin. It looks like the post_serialize implementation was the same as what ParsedNode did:

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if "_event_status" in dct:
del dct["_event_status"]
return dct

I think keeping the __post_serialize__ change makes sense then 👍 Probably the best way to address this is actually implement the method on NodeInfoMixin itself, but I'd do that in a follow-up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no sweat, just reverted the commit here

del dct["_event_status"]
return dct


# ====================================
# Metric node
Expand Down Expand Up @@ -1659,6 +1665,12 @@ def same_contents(self, old: Optional["SavedQuery"]) -> bool:
and True
)

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if "_event_status" in dct:
del dct["_event_status"]
return dct


# ====================================
# Patches
Expand Down
4 changes: 3 additions & 1 deletion core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1892,7 +1892,9 @@ def code(self) -> str:
return "Z023"

def message(self) -> str:
stats_line = "Done. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} TOTAL={total}"
stats_line = (
"Done. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} NO-OP={noop} TOTAL={total}"
)
return stats_line.format(**self.stats)


Expand Down
10 changes: 6 additions & 4 deletions core/dbt/graph/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,12 @@
elif unique_id in self.manifest.saved_queries:
saved_query = self.manifest.saved_queries[unique_id]
return saved_query.config.enabled

node = self.manifest.nodes[unique_id]

return node.config.enabled
elif unique_id in self.manifest.exposures:
exposure = self.manifest.exposures[unique_id]
return exposure.config.enabled

Check warning on line 183 in core/dbt/graph/selector.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/graph/selector.py#L182-L183

Added lines #L182 - L183 were not covered by tests
else:
node = self.manifest.nodes[unique_id]
return node.config.enabled

def _is_empty_node(self, unique_id: UniqueId) -> bool:
if unique_id in self.manifest.nodes:
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/runner/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .exposure_runner import ExposureRunner
from .saved_query_runner import SavedQueryRunner
7 changes: 7 additions & 0 deletions core/dbt/runner/exposure_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dbt.runner.no_op_runner import NoOpRunner
aranke marked this conversation as resolved.
Show resolved Hide resolved


class ExposureRunner(NoOpRunner):
@property
def description(self) -> str:
return f"exposure {self.node.name}"
45 changes: 45 additions & 0 deletions core/dbt/runner/no_op_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import threading

from dbt.artifacts.schemas.results import RunStatus
from dbt.artifacts.schemas.run import RunResult
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import LogNodeNoOpResult
from dbt.task.base import BaseRunner
from dbt_common.events.functions import fire_event


class NoOpRunner(BaseRunner):
@property
def description(self) -> str:
raise NotImplementedError("description not implemented")

Check warning on line 14 in core/dbt/runner/no_op_runner.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/runner/no_op_runner.py#L14

Added line #L14 was not covered by tests

def before_execute(self) -> None:
pass

def compile(self, manifest: Manifest):
return self.node

def after_execute(self, result) -> None:
fire_event(
LogNodeNoOpResult(
description=self.description,
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info,
)
)

def execute(self, compiled_node, manifest):
# no-op
return RunResult(
node=compiled_node,
status=RunStatus.NoOp,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message="NO-OP",
adapter_response={},
failures=0,
batch_results=None,
agate_table=None,
)
7 changes: 7 additions & 0 deletions core/dbt/runner/saved_query_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dbt.runner.no_op_runner import NoOpRunner


class SavedQueryRunner(NoOpRunner):
@property
def description(self) -> str:
return f"saved query {self.node.name}"
aranke marked this conversation as resolved.
Show resolved Hide resolved
48 changes: 5 additions & 43 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import threading
from typing import Dict, List, Optional, Set, Type

from dbt.artifacts.schemas.results import NodeStatus, RunStatus
from dbt.artifacts.schemas.results import NodeStatus
from dbt.artifacts.schemas.run import RunResult
from dbt.cli.flags import Flags
from dbt.config.runtime import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest
from dbt.events.types import LogNodeNoOpResult
from dbt.exceptions import DbtInternalError
from dbt.graph import Graph, GraphQueue, ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.runner import ExposureRunner as exposure_runner
from dbt.runner import SavedQueryRunner as saved_query_runner
from dbt.task.base import BaseRunner, resource_types_from_args
from dbt.task.run import MicrobatchModelRunner
from dbt_common.events.functions import fire_event

from .run import ModelRunner as run_model_runner
from .run import RunTask
Expand All @@ -21,44 +20,6 @@
from .test import TestRunner as test_runner


class SavedQueryRunner(BaseRunner):
# Stub. No-op Runner for Saved Queries, which require MetricFlow for execution.
@property
def description(self) -> str:
return f"saved query {self.node.name}"

def before_execute(self) -> None:
pass

def compile(self, manifest: Manifest):
return self.node

def after_execute(self, result) -> None:
fire_event(
LogNodeNoOpResult(
description=self.description,
index=self.node_index,
total=self.num_nodes,
node_info=self.node.node_info,
)
)

def execute(self, compiled_node, manifest):
# no-op
return RunResult(
node=compiled_node,
status=RunStatus.Success,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message="NO-OP",
adapter_response={},
failures=0,
batch_results=None,
agate_table=None,
)


class BuildTask(RunTask):
"""The Build task processes all assets of a given process and attempts to
'build' them in an opinionated fashion. Every resource type outlined in
Expand All @@ -80,7 +41,8 @@ class BuildTask(RunTask):
NodeType.Seed: seed_runner,
NodeType.Test: test_runner,
NodeType.Unit: test_runner,
NodeType.SavedQuery: SavedQueryRunner,
NodeType.SavedQuery: saved_query_runner,
NodeType.Exposure: exposure_runner,
}
ALL_RESOURCE_VALUES = frozenset({x for x in RUNNER_MAP.keys()})

Expand Down
3 changes: 3 additions & 0 deletions core/dbt/task/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def interpret_run_result(result) -> str:
return "warn"
elif result.status in (NodeStatus.Pass, NodeStatus.Success):
return "pass"
elif result.status == NodeStatus.NoOp:
return "noop"
else:
raise RuntimeError(f"unhandled result {result}")

Expand All @@ -58,6 +60,7 @@ def print_run_status_line(results) -> None:
"skip": 0,
"pass": 0,
"warn": 0,
"noop": 0,
"total": 0,
}

Expand Down
2 changes: 2 additions & 0 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ def _runtime_initialize(self):
self._flattened_nodes.append(self.manifest.saved_queries[uid])
elif uid in self.manifest.unit_tests:
self._flattened_nodes.append(self.manifest.unit_tests[uid])
elif uid in self.manifest.exposures:
self._flattened_nodes.append(self.manifest.exposures[uid])
else:
raise DbtInternalError(
f"Node selection returned {uid}, expected a node, a source, or a unit test"
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/adapter/concurrency/test_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def models(self):
}


class TestConcurenncy(BaseConcurrency):
class TestConcurrency(BaseConcurrency):
def test_concurrency(self, project):
run_dbt(["seed", "--select", "seed"])
results = run_dbt(["run"], expect_pass=False)
Expand All @@ -327,4 +327,4 @@ def test_concurrency(self, project):
check_table_does_not_exist(project.adapter, "invalid")
check_table_does_not_exist(project.adapter, "skip")

assert "PASS=5 WARN=0 ERROR=1 SKIP=1 TOTAL=7" in output
assert "PASS=5 WARN=0 ERROR=1 SKIP=1 NO-OP=0 TOTAL=7" in output
6 changes: 3 additions & 3 deletions tests/functional/adapter/hooks/test_on_run_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def models(self):

@pytest.fixture(scope="class")
def log_counts(self):
return "PASS=2 WARN=0 ERROR=2 SKIP=1 TOTAL=5"
return "PASS=2 WARN=0 ERROR=2 SKIP=1 NO-OP=0 TOTAL=5"

@pytest.fixture(scope="class")
def my_model_run_status(self):
Expand Down Expand Up @@ -90,7 +90,7 @@ def flags(self):

@pytest.fixture(scope="class")
def log_counts(self):
return "PASS=2 WARN=0 ERROR=1 SKIP=2 TOTAL=5"
return "PASS=2 WARN=0 ERROR=1 SKIP=2 NO-OP=0 TOTAL=5"

@pytest.fixture(scope="class")
def my_model_run_status(self):
Expand Down Expand Up @@ -125,7 +125,7 @@ def test_results(self, project):
]

assert [(result.node.unique_id, result.status) for result in results] == expected_results
assert "PASS=3 WARN=0 ERROR=1 SKIP=1 TOTAL=5" in log_output
assert "PASS=3 WARN=0 ERROR=1 SKIP=1 NO-OP=0 TOTAL=5" in log_output
aranke marked this conversation as resolved.
Show resolved Hide resolved
assert "4 project hooks, 1 view model" in log_output

run_results = get_artifact(project.project_root, "target", "run_results.json")
Expand Down
6 changes: 4 additions & 2 deletions tests/functional/defer_state/test_run_results_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,14 @@ def test_build_run_results_state(self, project):
results = run_dbt(
["build", "--select", "result:error+", "--state", "./state"], expect_pass=False
)
assert len(results) == 4
assert len(results) == 5
nodes = set([elem.node.name for elem in results])
assert nodes == {
"table_model",
"view_model",
"not_null_view_model_id",
"unique_view_model_id",
"my_exposure",
}

results = run_dbt(["ls", "--select", "result:error+", "--state", "./state"])
Expand Down Expand Up @@ -443,14 +444,15 @@ def test_concurrent_selectors_build_run_results_state(self, project):
["build", "--select", "state:modified+", "result:error+", "--state", "./state"],
expect_pass=False,
)
assert len(results) == 5
assert len(results) == 6
nodes = set([elem.node.name for elem in results])
assert nodes == {
"table_model_modified_example",
"view_model",
"table_model",
"not_null_view_model_id",
"unique_view_model_id",
"my_exposure",
}

self.update_view_model_failing_tests()
Expand Down
36 changes: 32 additions & 4 deletions tests/functional/exposures/test_exposures.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import pytest

from dbt.artifacts.schemas.results import RunStatus
from dbt.contracts.graph.nodes import Exposure
from dbt.tests.util import get_manifest, run_dbt
from tests.functional.exposures.fixtures import (
metricflow_time_spine_sql,
Expand All @@ -25,8 +27,8 @@ def models(self):
"metrics.yml": metrics_schema_yml,
}

def test_names_with_spaces(self, project):
run_dbt(["run"])
def test_compilation_names_with_spaces(self, project):
run_dbt(["compile"])
manifest = get_manifest(project.project_root)
exposure_ids = list(manifest.exposures.keys())
expected_exposure_ids = [
Expand All @@ -36,8 +38,8 @@ def test_names_with_spaces(self, project):
assert exposure_ids == expected_exposure_ids
assert manifest.exposures["exposure.test.simple_exposure"].label == "simple exposure label"

def test_depends_on(self, project):
run_dbt(["run"])
def test_compilation_depends_on(self, project):
run_dbt(["compile"])
manifest = get_manifest(project.project_root)
exposure_depends_on = manifest.exposures["exposure.test.simple_exposure"].depends_on.nodes
expected_exposure_depends_on = [
Expand All @@ -46,3 +48,29 @@ def test_depends_on(self, project):
"metric.test.metric",
]
assert sorted(exposure_depends_on) == sorted(expected_exposure_depends_on)

def test_execution_default(self, project):
results = run_dbt(["build"])
exposure_results = (
result for result in results.results if isinstance(result.node, Exposure)
)
assert {result.node.name for result in exposure_results} == {
"simple_exposure",
"notebook_exposure",
}
assert all(result.status == RunStatus.NoOp for result in exposure_results)
assert all("NO-OP" in result.message for result in exposure_results)

def test_execution_exclude(self, project):
results = run_dbt(["build", "--exclude", "simple_exposure"])
exposure_results = (
result for result in results.results if isinstance(result.node, Exposure)
)
assert {result.node.name for result in exposure_results} == {"notebook_exposure"}

def test_execution_select(self, project):
results = run_dbt(["build", "--select", "simple_exposure"])
exposure_results = (
result for result in results.results if isinstance(result.node, Exposure)
)
assert {result.node.name for result in exposure_results} == {"simple_exposure"}
Loading
Loading