From ba6c7baf1d66d91fe7050e351e408e2a1186eee7 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Fri, 18 Oct 2024 22:28:58 +0100 Subject: [PATCH 1/3] [Tidy-First]: Fix `timings` object for hooks and macros, and make types of timings explicit (#10882) * [Tidy-First]: Fix `timings` object for hooks and macros, and make types of timings explicit * cast literal to str * change test * change jsonschema to enum * Discard changes to schemas/dbt/manifest/v12.json * nits --------- Co-authored-by: Chenyu Li --- core/dbt/artifacts/schemas/results.py | 16 +++++-- core/dbt/task/run.py | 19 +++++--- core/dbt/task/run_operation.py | 46 +++++++++++-------- schemas/dbt/run-results/v6.json | 6 ++- schemas/dbt/sources/v3.json | 6 ++- .../adapter/hooks/test_on_run_hooks.py | 8 ++++ .../functional/microbatch/test_microbatch.py | 2 +- .../run_operations/test_run_operations.py | 17 +++++++ tests/unit/test_events.py | 2 +- 9 files changed, 88 insertions(+), 34 deletions(-) diff --git a/core/dbt/artifacts/schemas/results.py b/core/dbt/artifacts/schemas/results.py index 00746c87885..ee27fc6d5d4 100644 --- a/core/dbt/artifacts/schemas/results.py +++ b/core/dbt/artifacts/schemas/results.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Sequence, Union +from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Union from dbt.contracts.graph.nodes import ResultNode from dbt_common.dataclass_schema import StrEnum, dbtClassMixin @@ -10,7 +10,13 @@ @dataclass class TimingInfo(dbtClassMixin): - name: str + """ + Represents a step in the execution of a node. + `name` should be one of: compile, execute, or other + Do not call directly, use `collect_timing_info` instead. + """ + + name: Literal["compile", "execute", "other"] started_at: Optional[datetime] = None completed_at: Optional[datetime] = None @@ -21,7 +27,7 @@ def end(self): self.completed_at = datetime.utcnow() def to_msg_dict(self): - msg_dict = {"name": self.name} + msg_dict = {"name": str(self.name)} if self.started_at: msg_dict["started_at"] = datetime_to_json_string(self.started_at) if self.completed_at: @@ -31,7 +37,9 @@ def to_msg_dict(self): # This is a context manager class collect_timing_info: - def __init__(self, name: str, callback: Callable[[TimingInfo], None]) -> None: + def __init__( + self, name: Literal["compile", "execute", "other"], callback: Callable[[TimingInfo], None] + ) -> None: self.timing_info = TimingInfo(name=name) self.callback = callback diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 99913a551c5..f159861e1ae 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -17,6 +17,7 @@ RunningStatus, RunStatus, TimingInfo, + collect_timing_info, ) from dbt.artifacts.schemas.run import RunResult from dbt.cli.flags import Flags @@ -633,7 +634,6 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: def safe_run_hooks( self, adapter: BaseAdapter, hook_type: RunHookType, extra_context: Dict[str, Any] ) -> RunStatus: - started_at = datetime.utcnow() ordered_hooks = self.get_hooks_by_type(hook_type) if hook_type == RunHookType.End and ordered_hooks: @@ -653,14 +653,20 @@ def safe_run_hooks( hook.index = idx hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}" execution_time = 0.0 - timing = [] + timing: List[TimingInfo] = [] failures = 1 if not failed: + with collect_timing_info("compile", timing.append): + sql = self.get_hook_sql( + adapter, hook, hook.index, num_hooks, extra_context + ) + + started_at = timing[0].started_at or datetime.utcnow() hook.update_event_status( started_at=started_at.isoformat(), node_status=RunningStatus.Started ) - sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context) + fire_event( LogHookStartLine( statement=hook_name, @@ -670,11 +676,12 @@ def safe_run_hooks( ) ) - status, message = get_execution_status(sql, adapter) - finished_at = datetime.utcnow() + with collect_timing_info("execute", timing.append): + status, message = get_execution_status(sql, adapter) + + finished_at = timing[1].completed_at or datetime.utcnow() hook.update_event_status(finished_at=finished_at.isoformat()) execution_time = (finished_at - started_at).total_seconds() - timing = [TimingInfo(hook_name, started_at, finished_at)] failures = 0 if status == RunStatus.Success else 1 if status == RunStatus.Success: diff --git a/core/dbt/task/run_operation.py b/core/dbt/task/run_operation.py index 793ba81fb01..ebe8b14352e 100644 --- a/core/dbt/task/run_operation.py +++ b/core/dbt/task/run_operation.py @@ -2,11 +2,11 @@ import threading import traceback from datetime import datetime -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, List import dbt_common.exceptions from dbt.adapters.factory import get_adapter -from dbt.artifacts.schemas.results import RunStatus, TimingInfo +from dbt.artifacts.schemas.results import RunStatus, TimingInfo, collect_timing_info from dbt.artifacts.schemas.run import RunResult, RunResultsArtifact from dbt.contracts.files import FileHash from dbt.contracts.graph.nodes import HookNode @@ -51,25 +51,29 @@ def _run_unsafe(self, package_name, macro_name) -> "agate.Table": return res def run(self) -> RunResultsArtifact: - start = datetime.utcnow() - self.compile_manifest() + timing: List[TimingInfo] = [] - success = True + with collect_timing_info("compile", timing.append): + self.compile_manifest() + + start = timing[0].started_at + success = True package_name, macro_name = self._get_macro_parts() - try: - self._run_unsafe(package_name, macro_name) - except dbt_common.exceptions.DbtBaseException as exc: - fire_event(RunningOperationCaughtError(exc=str(exc))) - fire_event(LogDebugStackTrace(exc_info=traceback.format_exc())) - success = False - except Exception as exc: - fire_event(RunningOperationUncaughtError(exc=str(exc))) - fire_event(LogDebugStackTrace(exc_info=traceback.format_exc())) - success = False + with collect_timing_info("execute", timing.append): + try: + self._run_unsafe(package_name, macro_name) + except dbt_common.exceptions.DbtBaseException as exc: + fire_event(RunningOperationCaughtError(exc=str(exc))) + fire_event(LogDebugStackTrace(exc_info=traceback.format_exc())) + success = False + except Exception as exc: + fire_event(RunningOperationUncaughtError(exc=str(exc))) + fire_event(LogDebugStackTrace(exc_info=traceback.format_exc())) + success = False - end = datetime.utcnow() + end = timing[1].completed_at macro = ( self.manifest.find_macro_by_name(macro_name, self.config.project_name, package_name) @@ -85,10 +89,12 @@ def run(self) -> RunResultsArtifact: f"dbt could not find a macro with the name '{macro_name}' in any package" ) + execution_time = (end - start).total_seconds() if start and end else 0.0 + run_result = RunResult( adapter_response={}, status=RunStatus.Success if success else RunStatus.Error, - execution_time=(end - start).total_seconds(), + execution_time=execution_time, failures=0 if success else 1, message=None, node=HookNode( @@ -105,13 +111,13 @@ def run(self) -> RunResultsArtifact: original_file_path="", ), thread_id=threading.current_thread().name, - timing=[TimingInfo(name=macro_name, started_at=start, completed_at=end)], + timing=timing, batch_results=None, ) results = RunResultsArtifact.from_execution_results( - generated_at=end, - elapsed_time=(end - start).total_seconds(), + generated_at=end or datetime.utcnow(), + elapsed_time=execution_time, args={ k: v for k, v in self.args.__dict__.items() diff --git a/schemas/dbt/run-results/v6.json b/schemas/dbt/run-results/v6.json index 1bf1cf75e83..96456882ae6 100644 --- a/schemas/dbt/run-results/v6.json +++ b/schemas/dbt/run-results/v6.json @@ -84,7 +84,11 @@ "title": "TimingInfo", "properties": { "name": { - "type": "string" + "enum": [ + "compile", + "execute", + "other" + ] }, "started_at": { "anyOf": [ diff --git a/schemas/dbt/sources/v3.json b/schemas/dbt/sources/v3.json index df2784f1a81..8cb3633f99a 100644 --- a/schemas/dbt/sources/v3.json +++ b/schemas/dbt/sources/v3.json @@ -211,7 +211,11 @@ "title": "TimingInfo", "properties": { "name": { - "type": "string" + "enum": [ + "compile", + "execute", + "other" + ] }, "started_at": { "anyOf": [ diff --git a/tests/functional/adapter/hooks/test_on_run_hooks.py b/tests/functional/adapter/hooks/test_on_run_hooks.py index 42edbdae970..b9239b93b4a 100644 --- a/tests/functional/adapter/hooks/test_on_run_hooks.py +++ b/tests/functional/adapter/hooks/test_on_run_hooks.py @@ -55,6 +55,14 @@ def test_results(self, project, log_counts, my_model_run_status): for result in results if isinstance(result.node, HookNode) ] == [(id, str(status)) for id, status in expected_results if id.startswith("operation")] + + for result in results: + if result.status == RunStatus.Skipped: + continue + + timing_keys = [timing.name for timing in result.timing] + assert timing_keys == ["compile", "execute"] + assert log_counts in log_output assert "4 project hooks, 1 view model" in log_output diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 71c8588b17f..8bbf274554d 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -164,7 +164,7 @@ def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strateg ): # Initial run with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run"]) + run_dbt(["run"], expect_pass=False) # Incremental run fails with patch_microbatch_end_time("2020-01-03 13:57:00"): diff --git a/tests/functional/run_operations/test_run_operations.py b/tests/functional/run_operations/test_run_operations.py index 064c98b3a51..258ed679d7e 100644 --- a/tests/functional/run_operations/test_run_operations.py +++ b/tests/functional/run_operations/test_run_operations.py @@ -3,6 +3,7 @@ import pytest import yaml +from dbt.artifacts.schemas.results import RunStatus from dbt.tests.util import ( check_table_does_exist, mkdir, @@ -135,9 +136,25 @@ def test_run_operation_local_macro(self, project): run_dbt(["deps"]) results, log_output = run_dbt_and_capture(["run-operation", "something_cool"]) + + for result in results: + if result.status == RunStatus.Skipped: + continue + + timing_keys = [timing.name for timing in result.timing] + assert timing_keys == ["compile", "execute"] + assert "something cool" in log_output results, log_output = run_dbt_and_capture(["run-operation", "pkg.something_cool"]) + + for result in results: + if result.status == RunStatus.Skipped: + continue + + timing_keys = [timing.name for timing in result.timing] + assert timing_keys == ["compile", "execute"] + assert "something cool" in log_output rm_dir("pkg") diff --git a/tests/unit/test_events.py b/tests/unit/test_events.py index 085f849492e..f6ac66f0034 100644 --- a/tests/unit/test_events.py +++ b/tests/unit/test_events.py @@ -517,7 +517,7 @@ def test_all_serializable(self): def test_date_serialization(): - ti = TimingInfo("test") + ti = TimingInfo("compile") ti.begin() ti.end() ti_dict = ti.to_dict() From a0674db8400673d7dd4ebf7597dd82b3349337ce Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Fri, 18 Oct 2024 15:07:03 -0700 Subject: [PATCH 2/3] exclude hook results from results in on-run-end context (#10885) * exclude hook results from results in on-run-end context * changelog * preserve previous behavior --- .../unreleased/Fixes-20241018-135810.yaml | 6 ++ core/dbt/task/run.py | 4 +- core/dbt/task/runnable.py | 1 - .../adapter/hooks/test_on_run_hooks.py | 74 +++++++++++++++++++ 4 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 .changes/unreleased/Fixes-20241018-135810.yaml diff --git a/.changes/unreleased/Fixes-20241018-135810.yaml b/.changes/unreleased/Fixes-20241018-135810.yaml new file mode 100644 index 00000000000..c205e15bb09 --- /dev/null +++ b/.changes/unreleased/Fixes-20241018-135810.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Exclude hook result from results in on-run-end context +time: 2024-10-18T13:58:10.396884-07:00 +custom: + Author: ChenyuLInx + Issue: "7387" diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index f159861e1ae..7a321e69d30 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -774,7 +774,9 @@ def after_run(self, adapter, results) -> None: extras = { "schemas": list({s for _, s in database_schema_set}), - "results": results, + "results": [ + r for r in results if r.thread_id != "main" or r.status == RunStatus.Error + ], # exclude that didn't fail to preserve backwards compatibility "database_schemas": list(database_schema_set), } with adapter.connection_named("master"): diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index a37308f81ea..7ad9d87e10d 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -512,7 +512,6 @@ def execute_with_hooks(self, selected_uids: AbstractSet[str]): self.started_at = time.time() try: before_run_status = self.before_run(adapter, selected_uids) - if before_run_status == RunStatus.Success or ( not get_flags().skip_nodes_if_on_run_start_fails ): diff --git a/tests/functional/adapter/hooks/test_on_run_hooks.py b/tests/functional/adapter/hooks/test_on_run_hooks.py index b9239b93b4a..b85784be3cf 100644 --- a/tests/functional/adapter/hooks/test_on_run_hooks.py +++ b/tests/functional/adapter/hooks/test_on_run_hooks.py @@ -168,3 +168,77 @@ def test_results(self, project): run_results = get_artifact(project.project_root, "target", "run_results.json") assert run_results["results"] == [] + + +class Test__HookContext__HookSuccess: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "on-run-start": [ + "select 1 as id", # success + "select 1 as id", # success + ], + "on-run-end": [ + '{{ log("Num Results in context: " ~ results|length)}}' + "{{ output_thread_ids(results) }}", + ], + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "log.sql": """ +{% macro output_thread_ids(results) %} + {% for result in results %} + {{ log("Thread ID: " ~ result.thread_id) }} + {% endfor %} +{% endmacro %} +""" + } + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": "select 1"} + + def test_results_in_context_success(self, project): + results, log_output = run_dbt_and_capture(["--debug", "run"]) + assert "Thread ID: " in log_output + assert "Thread ID: main" not in log_output + assert results[0].thread_id == "main" # hook still exists in run results + assert "Num Results in context: 1" in log_output # only model given hook was successful + + +class Test__HookContext__HookFail: + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "on-run-start": [ + "select a as id", # fail + ], + "on-run-end": [ + '{{ log("Num Results in context: " ~ results|length)}}' + "{{ output_thread_ids(results) }}", + ], + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "log.sql": """ +{% macro output_thread_ids(results) %} + {% for result in results %} + {{ log("Thread ID: " ~ result.thread_id) }} + {% endfor %} +{% endmacro %} +""" + } + + @pytest.fixture(scope="class") + def models(self): + return {"my_model.sql": "select 1"} + + def test_results_in_context_hook_fail(self, project): + results, log_output = run_dbt_and_capture(["--debug", "run"], expect_pass=False) + assert "Thread ID: main" in log_output + assert results[0].thread_id == "main" + assert "Num Results in context: 2" in log_output # failed hook and model From 7920b0e71d5ec5b769a4bd95e29d760fee93570e Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Mon, 21 Oct 2024 13:10:00 -0700 Subject: [PATCH 3/3] Update microbatch tests to handle update wherein incremental strategies are always validated (#10884) dbt-adapters updated the incremental_strategy validation of incremental models such that the validation now _always_ happens when an incremental model is executed. A test in dbt-core `TestMicrobatchCustomUserStrategyEnvVarTrueInvalid` was previously set to _expect_ buggy behavior where an incremental model would succeed on it's "first"/"refresh" run even if it had an invalid incremental strategy. Thus we needed to update this test in dbt-core to expect the now correct behavior of incremental model execution time validation --- tests/functional/microbatch/test_microbatch.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 8bbf274554d..02c6976c848 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -162,11 +162,8 @@ def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strateg with mock.patch.object( type(project.adapter), "valid_incremental_strategies", lambda _: [] ): - # Initial run - with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run"], expect_pass=False) - - # Incremental run fails + # Run of microbatch model while adapter doesn't have a "valid" + # microbatch strategy causes an error to be raised with patch_microbatch_end_time("2020-01-03 13:57:00"): _, logs = run_dbt_and_capture(["run"], expect_pass=False) assert "'microbatch' is not valid" in logs