Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source freshness precomputes metadata-based freshness in batch, if possible #9749

Merged
merged 14 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240404-170728.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: 'source freshness precomputes metadata-based freshness in batch, if possible '
time: 2024-04-04T17:07:28.717868-07:00
custom:
Author: michelleark
Issue: "8705"
79 changes: 73 additions & 6 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import threading
import time
from typing import Optional, List
from typing import Optional, List, AbstractSet, Dict

from .base import BaseRunner
from .printer import (
Expand All @@ -28,6 +28,8 @@

from dbt.adapters.capability import Capability
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.base.impl import FreshnessResponse
from dbt.contracts.graph.nodes import SourceDefinition, HookNode
from dbt_common.events.base_types import EventLevel
from dbt.graph import ResourceTypeSelector
Expand All @@ -36,6 +38,15 @@


class FreshnessRunner(BaseRunner):
def __init__(self, config, adapter, node, node_index, num_nodes) -> None:
super().__init__(config, adapter, node, node_index, num_nodes)
self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {}

def set_metadata_freshness_cache(
self, metadata_freshness_cache: Dict[BaseRelation, FreshnessResult]
) -> None:
self._metadata_freshness_cache = metadata_freshness_cache

def on_skip(self):
raise DbtRuntimeError("Freshness: nodes cannot be skipped!")

Expand Down Expand Up @@ -105,7 +116,7 @@
with self.adapter.connection_named(compiled_node.unique_id, compiled_node):
self.adapter.clear_transaction()
adapter_response: Optional[AdapterResponse] = None
freshness = None
freshness: Optional[FreshnessResponse] = None

if compiled_node.loaded_at_field is not None:
adapter_response, freshness = self.adapter.calculate_freshness(
Expand All @@ -125,10 +136,14 @@
EventLevel.WARN,
)

adapter_response, freshness = self.adapter.calculate_freshness_from_metadata(
relation,
macro_resolver=manifest,
)
metadata_source = self.adapter.Relation.create_from(self.config, compiled_node)
if metadata_source in self._metadata_freshness_cache:
freshness = self._metadata_freshness_cache[metadata_source]

Check warning on line 141 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L139-L141

Added lines #L139 - L141 were not covered by tests
else:
adapter_response, freshness = self.adapter.calculate_freshness_from_metadata(

Check warning on line 143 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L143

Added line #L143 was not covered by tests
relation,
macro_resolver=manifest,
)

status = compiled_node.freshness.status(freshness["age"])
else:
Expand Down Expand Up @@ -171,6 +186,10 @@


class FreshnessTask(RunTask):
def __init__(self, args, config, manifest) -> None:
super().__init__(args, config, manifest)
self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {}

def result_path(self):
if self.args.output:
return os.path.realpath(self.args.output)
Expand All @@ -190,6 +209,17 @@
resource_types=[NodeType.Source],
)

def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None:
super().before_run(adapter, selected_uids)
if adapter.supports(Capability.TableLastModifiedMetadataBatch):
self.populate_metadata_freshness_cache(adapter, selected_uids)

Check warning on line 215 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L215

Added line #L215 was not covered by tests

def get_runner(self, node) -> BaseRunner:
freshness_runner = super().get_runner(node)
assert isinstance(freshness_runner, FreshnessRunner)
freshness_runner.set_metadata_freshness_cache(self._metadata_freshness_cache)
return freshness_runner

def get_runner_type(self, _):
return FreshnessRunner

Expand All @@ -214,3 +244,40 @@
return super().get_hooks_by_type(hook_type)
else:
return []

def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[str]) -> None:
if self.manifest is None:
raise DbtInternalError("Manifest must be set to populate metadata freshness cache")

Check warning on line 250 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L250

Added line #L250 was not covered by tests

batch_metadata_sources: List[BaseRelation] = []
for selected_source_uid in list(selected_uids):
source = self.manifest.sources.get(selected_source_uid)
if source and source.loaded_at_field is None:
metadata_source = adapter.Relation.create_from(self.config, source)
batch_metadata_sources.append(metadata_source)

Check warning on line 257 in core/dbt/task/freshness.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/freshness.py#L256-L257

Added lines #L256 - L257 were not covered by tests

fire_event(
Note(
msg=f"Pulling freshness from warehouse metadata tables for {len(batch_metadata_sources)} sources"
),
EventLevel.INFO,
)

try:
_, metadata_freshness_results = adapter.calculate_freshness_from_metadata_batch(
batch_metadata_sources
)
self._metadata_freshness_cache.update(metadata_freshness_results)
except Exception as e:
# This error handling is intentionally very coarse.
# If anything goes wrong during batch metadata calculation, we can safely
# leave _metadata_freshness_cache unpopulated.
# Downstream, this will be gracefully handled as a cache miss and non-batch
# metadata-based freshness will still be performed on a source-by-source basis.
fire_event(
Note(msg=f"Metadata freshness could not be computed in batch: {e}"),
EventLevel.WARN,
)

def get_freshness_metadata_cache(self) -> Dict[BaseRelation, FreshnessResult]:
return self._metadata_freshness_cache
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
git+https://github.com/dbt-labs/dbt-adapters.git@main
git+https://github.com/dbt-labs/dbt-adapters.git
git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git@main
git+https://github.com/dbt-labs/dbt-postgres.git@main
Expand Down
154 changes: 154 additions & 0 deletions tests/unit/test_freshness_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import datetime
import pytest
from unittest import mock

from dbt.task.freshness import FreshnessTask, FreshnessResponse


class TestFreshnessTaskMetadataCache:
@pytest.fixture(scope="class")
def args(self):
mock_args = mock.Mock()
mock_args.state = None
mock_args.defer_state = None
mock_args.write_json = None

return mock_args

@pytest.fixture(scope="class")
def config(self):
mock_config = mock.Mock()
mock_config.threads = 1
mock_config.target_name = "mock_config_target_name"

@pytest.fixture(scope="class")
def manifest(self):
return mock.Mock()

@pytest.fixture(scope="class")
def source_with_loaded_at_field(self):
mock_source = mock.Mock()
mock_source.unique_id = "source_with_loaded_at_field"
mock_source.loaded_at_field = "loaded_at_field"
return mock_source

@pytest.fixture(scope="class")
def source_no_loaded_at_field(self):
mock_source = mock.Mock()
mock_source.unique_id = "source_no_loaded_at_field"
return mock_source

@pytest.fixture(scope="class")
def source_no_loaded_at_field2(self):
mock_source = mock.Mock()
mock_source.unique_id = "source_no_loaded_at_field2"
return mock_source

@pytest.fixture(scope="class")
def adapter(self):
return mock.Mock()

@pytest.fixture(scope="class")
def freshness_response(self):
return FreshnessResponse(
max_loaded_at=datetime.datetime(2020, 5, 2),
snapshotted_at=datetime.datetime(2020, 5, 4),
age=2,
)

def test_populate_metadata_freshness_cache(
self, args, config, manifest, adapter, source_no_loaded_at_field, freshness_response
):
manifest.sources = {source_no_loaded_at_field.unique_id: source_no_loaded_at_field}
adapter.Relation.create_from.return_value = "source_relation"
adapter.calculate_freshness_from_metadata_batch.return_value = (
[],
{"source_relation": freshness_response},
)
task = FreshnessTask(args=args, config=config, manifest=manifest)

task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id})

assert task.get_freshness_metadata_cache() == {"source_relation": freshness_response}

def test_populate_metadata_freshness_cache_multiple_sources(
self,
args,
config,
manifest,
adapter,
source_no_loaded_at_field,
source_no_loaded_at_field2,
freshness_response,
):
manifest.sources = {
source_no_loaded_at_field.unique_id: source_no_loaded_at_field,
source_no_loaded_at_field2.unique_id: source_no_loaded_at_field2,
}
adapter.Relation.create_from.side_effect = ["source_relation1", "source_relation2"]
adapter.calculate_freshness_from_metadata_batch.return_value = (
[],
{"source_relation1": freshness_response, "source_relation2": freshness_response},
)
task = FreshnessTask(args=args, config=config, manifest=manifest)

task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id})

assert task.get_freshness_metadata_cache() == {
"source_relation1": freshness_response,
"source_relation2": freshness_response,
}

def test_populate_metadata_freshness_cache_with_loaded_at_field(
self, args, config, manifest, adapter, source_with_loaded_at_field, freshness_response
):
manifest.sources = {
source_with_loaded_at_field.unique_id: source_with_loaded_at_field,
}
adapter.Relation.create_from.return_value = "source_relation"
adapter.calculate_freshness_from_metadata_batch.return_value = (
[],
{"source_relation": freshness_response},
)
task = FreshnessTask(args=args, config=config, manifest=manifest)

task.populate_metadata_freshness_cache(adapter, {source_with_loaded_at_field.unique_id})

assert task.get_freshness_metadata_cache() == {"source_relation": freshness_response}

def test_populate_metadata_freshness_cache_multiple_sources_mixed(
self,
args,
config,
manifest,
adapter,
source_no_loaded_at_field,
source_with_loaded_at_field,
freshness_response,
):
manifest.sources = {
source_no_loaded_at_field.unique_id: source_no_loaded_at_field,
source_with_loaded_at_field.unique_id: source_with_loaded_at_field,
}
adapter.Relation.create_from.return_value = "source_relation"
adapter.calculate_freshness_from_metadata_batch.return_value = (
[],
{"source_relation": freshness_response},
)
task = FreshnessTask(args=args, config=config, manifest=manifest)

task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id})

assert task.get_freshness_metadata_cache() == {"source_relation": freshness_response}

def test_populate_metadata_freshness_cache_adapter_exception(
self, args, config, manifest, adapter, source_no_loaded_at_field, freshness_response
):
manifest.sources = {source_no_loaded_at_field.unique_id: source_no_loaded_at_field}
adapter.Relation.create_from.return_value = "source_relation"
adapter.calculate_freshness_from_metadata_batch.side_effect = Exception()
task = FreshnessTask(args=args, config=config, manifest=manifest)

task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id})

assert task.get_freshness_metadata_cache() == {}