Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qmalcolm add event for debugging artifact writing #10936

Closed
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241029-181728.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Emit debug logging event whenever artifacts are written
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this?

Copy link
Contributor Author

@QMalcolm QMalcolm Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "need" is a strong word. We wanted to add this event because we couldn't tell if the writing of the artifact was actually happening when debugging the issue of #10934 in cloud. Having this log could thus reduce debug time in the future if something like this happens again. Additionally, it's possible that throughout execution we end up writing an artifact multiple times, which is likely wasteful. Having a log like this would be helpful in identifying if that is happening. Additionally, the fix for the #10934 isn't guaranteed by the fix in 0223f3a. That fix is fairly narrowly scoped. It is possible if an exception gets raised elsewhere, we might run into a situation again where an artifact is not being written (thus the log is useful).


Tangentially, I just admitted the fix in 0223f3a is pretty limited. This is unfortunately necessary. I tried moving the exception handling up some levels, but quickly you run into a labyrinth of catching too much in instances where failure is expected 🫠

time: 2024-10-29T18:17:28.321188-05:00
custom:
Author: QMalcolm
Issue: N/A
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20241029-182034.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Handle exceptions in `get_execution_status` more broadly to better ensure `run_results.json`
gets written
time: 2024-10-29T18:20:34.782845-05:00
custom:
Author: QMalcolm
Issue: "10934"
6 changes: 4 additions & 2 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
)
from dbt.contracts.graph.unparsed import SourcePatch, UnparsedVersion
from dbt.contracts.util import SourceKey
from dbt.events.types import UnpinnedRefNewVersionAvailable
from dbt.events.types import ArtifactWritten, UnpinnedRefNewVersionAvailable
from dbt.exceptions import (
AmbiguousResourceNameRefError,
CompilationError,
Expand Down Expand Up @@ -1219,7 +1219,9 @@ def writable_manifest(self) -> "WritableManifest":
)

def write(self, path):
self.writable_manifest().write(path)
writable = self.writable_manifest()
writable.write(path)
fire_event(ArtifactWritten(artifact_type=writable.__class__.__name__, artifact_path=path))

# Called in dbt.compilation.Linker.write_graph and
# dbt.graph.queue.get and ._include_in_cost
Expand Down
3 changes: 2 additions & 1 deletion core/dbt/contracts/graph/semantic_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
)
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ModelNode
from dbt.events.types import SemanticValidationFailure
from dbt.events.types import ArtifactWritten, SemanticValidationFailure
from dbt.exceptions import ParsingError
from dbt_common.clients.system import write_file
from dbt_common.events.base_types import EventLevel
Expand Down Expand Up @@ -71,6 +71,7 @@ def write_json_to_file(self, file_path: str):
semantic_manifest = self._get_pydantic_semantic_manifest()
json = semantic_manifest.json()
write_file(file_path, json)
fire_event(ArtifactWritten(artifact_type=self.__class__.__name__, artifact_path=file_path))

def _get_pydantic_semantic_manifest(self) -> PydanticSemanticManifest:
pydantic_time_spines: List[PydanticTimeSpine] = []
Expand Down
3 changes: 3 additions & 0 deletions core/dbt/contracts/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from dbt.artifacts.schemas.results import ExecutionResult, TimingInfo
from dbt.artifacts.schemas.run import RunExecutionResult, RunResult, RunResultsArtifact
from dbt.contracts.graph.nodes import ResultNode
from dbt.events.types import ArtifactWritten
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.events.functions import fire_event

TaskTags = Optional[Dict[str, Any]]
TaskID = uuid.UUID
Expand Down Expand Up @@ -49,6 +51,7 @@ def write(self, path: str) -> None:
args=self.args,
)
writable.write(path)
fire_event(ArtifactWritten(artifact_type=writable.__class__.__name__, artifact_path=path))

@classmethod
def from_local_result(
Expand Down
13 changes: 13 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,19 @@ message DepsScrubbedPackageNameMsg{
DepsScrubbedPackageName data = 2;
}

// P - Artifacts

// P001
message ArtifactWritten {
string artifact_type = 1;
string artifact_path = 2;
}

message ArtifactWrittenMsg {
CoreEventInfo info = 1;
ArtifactWritten data = 2;
}

// Q - Node execution

// Q001
Expand Down
634 changes: 319 additions & 315 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,19 @@ def message(self) -> str:
return f"Detected secret env var in {self.package_name}. dbt will write a scrubbed representation to the lock file. This will cause issues with subsequent 'dbt deps' using the lock file, requiring 'dbt deps --upgrade'"


# =======================================================
# P - Artifacts
# =======================================================


class ArtifactWritten(DebugLevel):
def code(self):
return "P001"

def message(self) -> str:
return f"Wrote artifact {self.artifact_type} to {self.artifact_path}"


# =======================================================
# Q - Node execution
# =======================================================
Expand Down
6 changes: 6 additions & 0 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
)
from dbt.contracts.graph.semantic_manifest import SemanticManifest
from dbt.events.types import (
ArtifactWritten,
DeprecatedModel,
DeprecatedReference,
InvalidDisabledTargetInTestNode,
Expand Down Expand Up @@ -2012,4 +2013,9 @@ def parse_manifest(
plugin_artifacts = pm.get_manifest_artifacts(manifest)
for path, plugin_artifact in plugin_artifacts.items():
plugin_artifact.write(path)
fire_event(
ArtifactWritten(
artifact_type=plugin_artifact.__class__.__name__, artifact_path=path
)
)
return manifest
5 changes: 5 additions & 0 deletions core/dbt/task/docs/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from dbt.constants import MANIFEST_FILE_NAME
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ResultNode
from dbt.events.types import ArtifactWritten
from dbt.exceptions import AmbiguousCatalogMatchError
from dbt.graph import ResourceTypeSelector
from dbt.graph.graph import UniqueId
Expand Down Expand Up @@ -309,6 +310,10 @@ def run(self) -> CatalogArtifact:

catalog_path = os.path.join(self.config.project_target_path, CATALOG_FILENAME)
results.write(catalog_path)
fire_event(
ArtifactWritten(artifact_type=results.__class__.__name__, artifact_path=catalog_path)
)

if self.args.compile:
write_manifest(self.manifest, self.config.project_target_path)

Expand Down
17 changes: 9 additions & 8 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,16 @@ def get_execution_status(sql: str, adapter: BaseAdapter) -> Tuple[RunStatus, str
response, _ = adapter.execute(sql, auto_begin=False, fetch=False)
status = RunStatus.Success
message = response._message
except (KeyboardInterrupt, SystemExit):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean if we cancel during get_execution_status no result file will be written?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. We could re-catch the KeyboardInterrupt / SystemExit in after_run and do something similar to what is done in execute_nodes.

raise
except DbtRuntimeError as exc:
status = RunStatus.Error
message = exc.msg
finally:
return status, message
except Exception as exc:
status = RunStatus.Error
message = str(exc)

return (status, message)


def track_model_run(index, num_nodes, run_model_result):
Expand Down Expand Up @@ -645,7 +650,6 @@ def safe_run_hooks(
return RunStatus.Success

status = RunStatus.Success
failed = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making this logic more concise.

If I understand it correctly, we want the behavior to be: if a node fails, we want to skip the rest?

I think the direction of status and failed is duplicate is spot on, but I am wondering should we get rid of status and rename failed to something like skip_rest_due_to_failure instead? In the current structure I have to do some guess and validation in order to understand what the code is trying to do.

Copy link
Contributor Author

@QMalcolm QMalcolm Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct, if one hook fails, the rest are expected to be skipped.

I can clean up the code a bit more, I was trying to avoid too much refactoring in this PR as it's generally best practice to separate feature/bug work from refactors. I actually walked back some refactoring work I did in this regard.


To the keen eyed observer, 5e9d3d6 foreshadowed such work. Work that I ultimately dropped before pushing my branch to reduce the number of changes:

If it isn't apparent, I'm trying to reduce the need for tracking variables
to make it easier extract the execution code to a separate private function
to make it easier to see what is happening.

num_hooks = len(ordered_hooks)

for idx, hook in enumerate(ordered_hooks, 1):
Expand All @@ -654,9 +658,8 @@ def safe_run_hooks(
hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}"
execution_time = 0.0
timing: List[TimingInfo] = []
failures = 1

if not failed:
if status == RunStatus.Success:
with collect_timing_info("compile", timing.append):
sql = self.get_hook_sql(
adapter, hook, hook.index, num_hooks, extra_context
Expand All @@ -682,13 +685,11 @@ def safe_run_hooks(
finished_at = timing[1].completed_at or datetime.utcnow()
hook.update_event_status(finished_at=finished_at.isoformat())
execution_time = (finished_at - started_at).total_seconds()
failures = 0 if status == RunStatus.Success else 1

if status == RunStatus.Success:
message = f"{hook_name} passed"
else:
message = f"{hook_name} failed, error:\n {message}"
failed = True
else:
status = RunStatus.Skipped
message = f"{hook_name} skipped"
Expand All @@ -703,7 +704,7 @@ def safe_run_hooks(
message=message,
adapter_response={},
execution_time=execution_time,
failures=failures,
failures=0 if status == RunStatus.Success else 1,
node=hook,
)
)
Expand Down
6 changes: 6 additions & 0 deletions core/dbt/task/run_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dbt.contracts.files import FileHash
from dbt.contracts.graph.nodes import HookNode
from dbt.events.types import (
ArtifactWritten,
LogDebugStackTrace,
RunningOperationCaughtError,
RunningOperationUncaughtError,
Expand Down Expand Up @@ -130,6 +131,11 @@ def run(self) -> RunResultsArtifact:

if self.args.write_json:
results.write(result_path)
fire_event(
ArtifactWritten(
artifact_type=results.__class__.__name__, artifact_path=result_path
)
)

return results

Expand Down
12 changes: 12 additions & 0 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dbt.contracts.graph.nodes import ResultNode
from dbt.contracts.state import PreviousState
from dbt.events.types import (
ArtifactWritten,
ConcurrencyLine,
DefaultSelector,
EndRunResult,
Expand Down Expand Up @@ -427,6 +428,12 @@ def execute_nodes(self):

if self.args.write_json and hasattr(run_result, "write"):
run_result.write(self.result_path())
fire_event(
ArtifactWritten(
artifact_type=run_result.__class__.__name__,
artifact_path=self.result_path(),
)
)

self._cancel_connections(pool)
print_run_end_messages(self.node_results, keyboard_interrupt=True)
Expand Down Expand Up @@ -591,6 +598,11 @@ def run(self):
write_manifest(self.manifest, self.config.project_target_path)
if hasattr(result, "write"):
result.write(self.result_path())
fire_event(
ArtifactWritten(
artifact_type=result.__class__.__name__, artifact_path=self.result_path()
)
)

self.task_end_messages(result.results)
return result
Expand Down
102 changes: 92 additions & 10 deletions tests/unit/task/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,32 @@
from argparse import Namespace
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
from typing import Optional, Type, Union
from unittest import mock
from unittest.mock import MagicMock, patch

import pytest
from psycopg2 import DatabaseError
from pytest_mock import MockerFixture

from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.postgres import PostgresAdapter
from dbt.artifacts.resources.base import FileHash
from dbt.artifacts.resources.types import NodeType, RunHookType
from dbt.artifacts.resources.v1.components import DependsOn
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt.artifacts.resources.v1.model import ModelConfig
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.artifacts.schemas.results import RunStatus
from dbt.artifacts.schemas.run import RunResult
from dbt.config.runtime import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ModelNode
from dbt.events.types import LogModelResult
from dbt.contracts.graph.nodes import HookNode, ModelNode
from dbt.exceptions import DbtRuntimeError
from dbt.flags import get_flags, set_from_args
from dbt.task.run import ModelRunner, RunTask
from dbt.tests.util import safe_set_invocation_context
from dbt_common.events.base_types import EventLevel
from dbt_common.events.event_manager_client import add_callback_to_manager
from tests.utils import EventCatcher


Expand Down Expand Up @@ -69,12 +75,6 @@ def test_run_task_preserve_edges():


class TestModelRunner:
@pytest.fixture
def log_model_result_catcher(self) -> EventCatcher:
catcher = EventCatcher(event_to_catch=LogModelResult)
add_callback_to_manager(catcher.catch)
return catcher

@pytest.fixture
def model_runner(
self,
Expand Down Expand Up @@ -239,3 +239,85 @@ class Relation:

# Assert result of _is_incremental
assert model_runner._is_incremental(model) == expectation


class TestRunTask:
@pytest.fixture
def hook_node(self) -> HookNode:
return HookNode(
package_name="test",
path="/root/x/path.sql",
original_file_path="/root/path.sql",
language="sql",
raw_code="select * from wherever",
name="foo",
resource_type=NodeType.Operation,
unique_id="model.test.foo",
fqn=["test", "models", "foo"],
refs=[],
sources=[],
metrics=[],
depends_on=DependsOn(),
description="",
database="test_db",
schema="test_schema",
alias="bar",
tags=[],
config=NodeConfig(),
index=None,
checksum=FileHash.from_contents(""),
unrendered_config={},
)

@pytest.mark.parametrize(
"error_to_raise,expected_result",
[
(None, RunStatus.Success),
(DbtRuntimeError, RunStatus.Error),
(DatabaseError, RunStatus.Error),
(KeyboardInterrupt, KeyboardInterrupt),
],
)
def test_safe_run_hooks(
self,
mocker: MockerFixture,
runtime_config: RuntimeConfig,
manifest: Manifest,
hook_node: HookNode,
error_to_raise: Optional[Type[Exception]],
expected_result: Union[RunStatus, Type[Exception]],
):
mocker.patch("dbt.task.run.RunTask.get_hooks_by_type").return_value = [hook_node]
mocker.patch("dbt.task.run.RunTask.get_hook_sql").return_value = hook_node.raw_code

flags = mock.Mock()
flags.state = None
flags.defer_state = None

run_task = RunTask(
args=flags,
config=runtime_config,
manifest=manifest,
)

adapter = mock.Mock()
adapter_execute = mock.Mock()
adapter_execute.return_value = (AdapterResponse(_message="Success"), None)

if error_to_raise:
adapter_execute.side_effect = error_to_raise("Oh no!")

adapter.execute = adapter_execute

try:
result = run_task.safe_run_hooks(
adapter=adapter,
hook_type=RunHookType.End,
extra_context={},
)
assert isinstance(expected_result, RunStatus)
assert result == expected_result
except BaseException as e:
assert not isinstance(expected_result, RunStatus)
assert issubclass(expected_result, BaseException)
assert type(e) == expected_result
Loading