Skip to content

Commit

Permalink
Feature/source freshness hooks (#9366)
Browse files Browse the repository at this point in the history
Co-authored-by: Ofek Weiss <[email protected]>
  • Loading branch information
MichelleArk and ofek1weiss authored Jan 23, 2024
1 parent cc7170d commit dc59c70
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 10 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231231-171205.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Added hook support for `dbt source freshness`
time: 2023-12-31T17:12:05.587185+02:00
custom:
Author: ofek1weiss
Issue: "5609"
9 changes: 8 additions & 1 deletion core/dbt/cli/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ def _assign_params(
# Add entire invocation command to flags
object.__setattr__(self, "INVOCATION_COMMAND", "dbt " + " ".join(sys.argv[1:]))

# Overwrite default assignments with user config if available.
if project_flags:
# Overwrite default assignments with project flags if available.
param_assigned_from_default_copy = params_assigned_from_default.copy()
for param_assigned_from_default in params_assigned_from_default:
project_flags_param_value = getattr(
Expand All @@ -252,6 +252,13 @@ def _assign_params(
param_assigned_from_default_copy.remove(param_assigned_from_default)
params_assigned_from_default = param_assigned_from_default_copy

# Add project-level flags that are not available as CLI options / env vars
for (
project_level_flag_name,
project_level_flag_value,
) in project_flags.project_only_flags.items():
object.__setattr__(self, project_level_flag_name.upper(), project_level_flag_value)

# Set hard coded flags.
object.__setattr__(self, "WHICH", invoked_subcommand_name or ctx.info_name)

Expand Down
5 changes: 5 additions & 0 deletions core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ class ProjectFlags(ExtensibleDbtClassMixin, Replaceable):
populate_cache: Optional[bool] = None
printer_width: Optional[int] = None
send_anonymous_usage_stats: bool = DEFAULT_SEND_ANONYMOUS_USAGE_STATS
source_freshness_run_project_hooks: bool = False
static_parser: Optional[bool] = None
use_colors: Optional[bool] = None
use_colors_file: Optional[bool] = None
Expand All @@ -316,6 +317,10 @@ class ProjectFlags(ExtensibleDbtClassMixin, Replaceable):
warn_error_options: Optional[Dict[str, Union[str, List[str]]]] = None
write_json: Optional[bool] = None

@property
def project_only_flags(self) -> Dict[str, Any]:
return {"source_freshness_run_project_hooks": self.source_freshness_run_project_hooks}


@dataclass
class ProfileConfig(dbtClassMixin, Replaceable):
Expand Down
23 changes: 17 additions & 6 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import os
import threading
import time
from typing import Optional
from typing import Optional, List

from .base import BaseRunner
from .printer import (
print_run_result_error,
)
from .runnable import GraphRunnableTask
from .run import RunTask

from dbt.artifacts.freshness import (
FreshnessResult,
Expand All @@ -23,11 +23,12 @@
LogStartLine,
LogFreshnessResult,
)
from dbt.node_types import NodeType
from dbt.contracts.results import RunStatus
from dbt.node_types import NodeType, RunHookType

from dbt.adapters.capability import Capability
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.contracts.graph.nodes import SourceDefinition
from dbt.contracts.graph.nodes import SourceDefinition, HookNode
from dbt_common.events.base_types import EventLevel
from dbt.graph import ResourceTypeSelector

Expand Down Expand Up @@ -170,7 +171,7 @@ def node_is_match(self, node):
return node.has_freshness


class FreshnessTask(GraphRunnableTask):
class FreshnessTask(RunTask):
def result_path(self):
if self.args.output:
return os.path.realpath(self.args.output)
Expand Down Expand Up @@ -200,7 +201,17 @@ def get_result(self, results, elapsed_time, generated_at):

def task_end_messages(self, results):
for result in results:
if result.status in (FreshnessStatus.Error, FreshnessStatus.RuntimeErr):
if result.status in (
FreshnessStatus.Error,
FreshnessStatus.RuntimeErr,
RunStatus.Error,
):
print_run_result_error(result)

fire_event(FreshnessCheckComplete())

def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]:
if self.args.source_freshness_run_project_hooks:
return super().get_hooks_by_type(hook_type)
else:
return []
11 changes: 9 additions & 2 deletions tests/functional/sources/common_source_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pytest
import yaml

from dbt.tests.util import run_dbt
from dbt.tests.util import run_dbt, run_dbt_and_capture
from tests.functional.sources.fixtures import (
models_schema_yml,
models_view_model_sql,
Expand Down Expand Up @@ -57,10 +57,17 @@ def project_config_update(self):
},
}

def run_dbt_with_vars(self, project, cmd, *args, **kwargs):
def _extend_cmd_with_vars(self, project, cmd):
vars_dict = {
"test_run_schema": project.test_schema,
"test_loaded_at": project.adapter.quote("updated_at"),
}
cmd.extend(["--vars", yaml.safe_dump(vars_dict)])

def run_dbt_with_vars(self, project, cmd, *args, **kwargs):
self._extend_cmd_with_vars(project, cmd)
return run_dbt(cmd, *args, **kwargs)

def run_dbt_and_capture_with_vars(self, project, cmd, *args, **kwargs):
self._extend_cmd_with_vars(project, cmd)
return run_dbt_and_capture(cmd, *args, **kwargs)
82 changes: 82 additions & 0 deletions tests/functional/sources/test_source_freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,85 @@ def warning_probe(e):
runner.invoke(["parse"])

assert got_warning


class TestHooksInSourceFreshness(SuccessfulSourceFreshnessTest):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"config-version": 2,
"on-run-start": ["{{ log('on-run-start hooks called') }}"],
"on-run-end": ["{{ log('on-run-end hooks called') }}"],
"flags": {
"source_freshness_run_project_hooks": True,
},
}

def test_hooks_do_run_for_source_freshness(
self,
project,
):
_, log_output = self.run_dbt_and_capture_with_vars(
project,
[
"source",
"freshness",
],
expect_pass=False,
)
assert "on-run-start" in log_output
assert "on-run-end" in log_output


class TestHooksInSourceFreshnessDisabled(SuccessfulSourceFreshnessTest):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"config-version": 2,
"on-run-start": ["{{ log('on-run-start hooks called') }}"],
"on-run-end": ["{{ log('on-run-end hooks called') }}"],
"flags": {
"source_freshness_run_project_hooks": False,
},
}

def test_hooks_do_run_for_source_freshness(
self,
project,
):
_, log_output = self.run_dbt_and_capture_with_vars(
project,
[
"source",
"freshness",
],
expect_pass=False,
)
assert "on-run-start" not in log_output
assert "on-run-end" not in log_output


class TestHooksInSourceFreshnessDefault(SuccessfulSourceFreshnessTest):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"config-version": 2,
"on-run-start": ["{{ log('on-run-start hooks called') }}"],
"on-run-end": ["{{ log('on-run-end hooks called') }}"],
}

def test_hooks_do_run_for_source_freshness(
self,
project,
):
_, log_output = self.run_dbt_and_capture_with_vars(
project,
[
"source",
"freshness",
],
expect_pass=False,
)
# default behaviour - no hooks run in source freshness
assert "on-run-start" not in log_output
assert "on-run-end" not in log_output
8 changes: 8 additions & 0 deletions tests/unit/test_cli_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,14 @@ def test_global_flag_at_child_context(self):

assert flags_a.USE_COLORS == flags_b.USE_COLORS

def test_set_project_only_flags(self, project_flags, run_context):
flags = Flags(run_context, project_flags)

for project_only_flag, project_only_flag_value in project_flags.project_only_flags.items():
assert getattr(flags, project_only_flag) == project_only_flag_value
# sanity check: ensure project_only_flag is not part of the click context
assert project_only_flag not in run_context.params

def _create_flags_from_dict(self, cmd, d):
write_file("", "profiles.yml")
result = Flags.from_dict(cmd, d)
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dbt import tracking
from dbt.contracts.files import SourceFile, FileHash, FilePath
from dbt.contracts.graph.manifest import MacroManifest, ManifestStateCheck
from dbt.contracts.project import ProjectFlags
from dbt.graph import NodeSelector, parse_difference
from dbt.events.logging import setup_event_logger
from dbt.mp_context import get_mp_context
Expand Down Expand Up @@ -130,7 +131,7 @@ def get_config(self, extra_cfg=None):
cfg.update(extra_cfg)

config = config_from_parts_or_dicts(project=cfg, profile=self.profile)
dbt.flags.set_from_args(Namespace(), config)
dbt.flags.set_from_args(Namespace(), ProjectFlags())
setup_event_logger(dbt.flags.get_flags())
object.__setattr__(dbt.flags.get_flags(), "PARTIAL_PARSE", False)
return config
Expand Down

0 comments on commit dc59c70

Please sign in to comment.