Skip to content

Commit

Permalink
source freshness precomputes metadata-based freshness in batch, if …
Browse files Browse the repository at this point in the history
…possible (#9749)
  • Loading branch information
MichelleArk authored Apr 12, 2024
1 parent f15e128 commit cb56f4f
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 7 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240404-170728.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: 'source freshness precomputes metadata-based freshness in batch, if possible '
time: 2024-04-04T17:07:28.717868-07:00
custom:
Author: michelleark
Issue: "8705"
79 changes: 73 additions & 6 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import threading
import time
from typing import Optional, List
from typing import Optional, List, AbstractSet, Dict

from .base import BaseRunner
from .printer import (
Expand All @@ -28,6 +28,8 @@

from dbt.adapters.capability import Capability
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.base.impl import FreshnessResponse
from dbt.contracts.graph.nodes import SourceDefinition, HookNode
from dbt_common.events.base_types import EventLevel
from dbt.graph import ResourceTypeSelector
Expand All @@ -36,6 +38,15 @@


class FreshnessRunner(BaseRunner):
def __init__(self, config, adapter, node, node_index, num_nodes) -> None:
super().__init__(config, adapter, node, node_index, num_nodes)
self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {}

def set_metadata_freshness_cache(
self, metadata_freshness_cache: Dict[BaseRelation, FreshnessResult]
) -> None:
self._metadata_freshness_cache = metadata_freshness_cache

def on_skip(self):
raise DbtRuntimeError("Freshness: nodes cannot be skipped!")

Expand Down Expand Up @@ -105,7 +116,7 @@ def execute(self, compiled_node, manifest):
with self.adapter.connection_named(compiled_node.unique_id, compiled_node):
self.adapter.clear_transaction()
adapter_response: Optional[AdapterResponse] = None
freshness = None
freshness: Optional[FreshnessResponse] = None

if compiled_node.loaded_at_field is not None:
adapter_response, freshness = self.adapter.calculate_freshness(
Expand All @@ -125,10 +136,14 @@ def execute(self, compiled_node, manifest):
EventLevel.WARN,
)

adapter_response, freshness = self.adapter.calculate_freshness_from_metadata(
relation,
macro_resolver=manifest,
)
metadata_source = self.adapter.Relation.create_from(self.config, compiled_node)
if metadata_source in self._metadata_freshness_cache:
freshness = self._metadata_freshness_cache[metadata_source]
else:
adapter_response, freshness = self.adapter.calculate_freshness_from_metadata(
relation,
macro_resolver=manifest,
)

status = compiled_node.freshness.status(freshness["age"])
else:
Expand Down Expand Up @@ -171,6 +186,10 @@ def node_is_match(self, node):


class FreshnessTask(RunTask):
def __init__(self, args, config, manifest) -> None:
super().__init__(args, config, manifest)
self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {}

def result_path(self):
if self.args.output:
return os.path.realpath(self.args.output)
Expand All @@ -190,6 +209,17 @@ def get_node_selector(self):
resource_types=[NodeType.Source],
)

def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None:
super().before_run(adapter, selected_uids)
if adapter.supports(Capability.TableLastModifiedMetadataBatch):
self.populate_metadata_freshness_cache(adapter, selected_uids)

def get_runner(self, node) -> BaseRunner:
freshness_runner = super().get_runner(node)
assert isinstance(freshness_runner, FreshnessRunner)
freshness_runner.set_metadata_freshness_cache(self._metadata_freshness_cache)
return freshness_runner

def get_runner_type(self, _):
return FreshnessRunner

Expand All @@ -214,3 +244,40 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]:
return super().get_hooks_by_type(hook_type)
else:
return []

def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[str]) -> None:
if self.manifest is None:
raise DbtInternalError("Manifest must be set to populate metadata freshness cache")

batch_metadata_sources: List[BaseRelation] = []
for selected_source_uid in list(selected_uids):
source = self.manifest.sources.get(selected_source_uid)
if source and source.loaded_at_field is None:
metadata_source = adapter.Relation.create_from(self.config, source)
batch_metadata_sources.append(metadata_source)

fire_event(
Note(
msg=f"Pulling freshness from warehouse metadata tables for {len(batch_metadata_sources)} sources"
),
EventLevel.INFO,
)

try:
_, metadata_freshness_results = adapter.calculate_freshness_from_metadata_batch(
batch_metadata_sources
)
self._metadata_freshness_cache.update(metadata_freshness_results)
except Exception as e:
# This error handling is intentionally very coarse.
# If anything goes wrong during batch metadata calculation, we can safely
# leave _metadata_freshness_cache unpopulated.
# Downstream, this will be gracefully handled as a cache miss and non-batch
# metadata-based freshness will still be performed on a source-by-source basis.
fire_event(
Note(msg=f"Metadata freshness could not be computed in batch: {e}"),
EventLevel.WARN,
)

def get_freshness_metadata_cache(self) -> Dict[BaseRelation, FreshnessResult]:
return self._metadata_freshness_cache
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
git+https://github.com/dbt-labs/dbt-adapters.git@main
git+https://github.com/dbt-labs/dbt-adapters.git
git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git@main
git+https://github.com/dbt-labs/dbt-postgres.git@main
Expand Down
154 changes: 154 additions & 0 deletions tests/unit/test_freshness_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import datetime
import pytest
from unittest import mock

from dbt.task.freshness import FreshnessTask, FreshnessResponse


class TestFreshnessTaskMetadataCache:
@pytest.fixture(scope="class")
def args(self):
mock_args = mock.Mock()
mock_args.state = None
mock_args.defer_state = None
mock_args.write_json = None

return mock_args

@pytest.fixture(scope="class")
def config(self):
mock_config = mock.Mock()
mock_config.threads = 1
mock_config.target_name = "mock_config_target_name"

@pytest.fixture(scope="class")
def manifest(self):
return mock.Mock()

@pytest.fixture(scope="class")
def source_with_loaded_at_field(self):
mock_source = mock.Mock()
mock_source.unique_id = "source_with_loaded_at_field"
mock_source.loaded_at_field = "loaded_at_field"
return mock_source

@pytest.fixture(scope="class")
def source_no_loaded_at_field(self):
mock_source = mock.Mock()
mock_source.unique_id = "source_no_loaded_at_field"
return mock_source

@pytest.fixture(scope="class")
def source_no_loaded_at_field2(self):
mock_source = mock.Mock()
mock_source.unique_id = "source_no_loaded_at_field2"
return mock_source

@pytest.fixture(scope="class")
def adapter(self):
return mock.Mock()

@pytest.fixture(scope="class")
def freshness_response(self):
return FreshnessResponse(
max_loaded_at=datetime.datetime(2020, 5, 2),
snapshotted_at=datetime.datetime(2020, 5, 4),
age=2,
)

def test_populate_metadata_freshness_cache(
self, args, config, manifest, adapter, source_no_loaded_at_field, freshness_response
):
manifest.sources = {source_no_loaded_at_field.unique_id: source_no_loaded_at_field}
adapter.Relation.create_from.return_value = "source_relation"
adapter.calculate_freshness_from_metadata_batch.return_value = (
[],
{"source_relation": freshness_response},
)
task = FreshnessTask(args=args, config=config, manifest=manifest)

task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id})

assert task.get_freshness_metadata_cache() == {"source_relation": freshness_response}

def test_populate_metadata_freshness_cache_multiple_sources(
self,
args,
config,
manifest,
adapter,
source_no_loaded_at_field,
source_no_loaded_at_field2,
freshness_response,
):
manifest.sources = {
source_no_loaded_at_field.unique_id: source_no_loaded_at_field,
source_no_loaded_at_field2.unique_id: source_no_loaded_at_field2,
}
adapter.Relation.create_from.side_effect = ["source_relation1", "source_relation2"]
adapter.calculate_freshness_from_metadata_batch.return_value = (
[],
{"source_relation1": freshness_response, "source_relation2": freshness_response},
)
task = FreshnessTask(args=args, config=config, manifest=manifest)

task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id})

assert task.get_freshness_metadata_cache() == {
"source_relation1": freshness_response,
"source_relation2": freshness_response,
}

def test_populate_metadata_freshness_cache_with_loaded_at_field(
self, args, config, manifest, adapter, source_with_loaded_at_field, freshness_response
):
manifest.sources = {
source_with_loaded_at_field.unique_id: source_with_loaded_at_field,
}
adapter.Relation.create_from.return_value = "source_relation"
adapter.calculate_freshness_from_metadata_batch.return_value = (
[],
{"source_relation": freshness_response},
)
task = FreshnessTask(args=args, config=config, manifest=manifest)

task.populate_metadata_freshness_cache(adapter, {source_with_loaded_at_field.unique_id})

assert task.get_freshness_metadata_cache() == {"source_relation": freshness_response}

def test_populate_metadata_freshness_cache_multiple_sources_mixed(
self,
args,
config,
manifest,
adapter,
source_no_loaded_at_field,
source_with_loaded_at_field,
freshness_response,
):
manifest.sources = {
source_no_loaded_at_field.unique_id: source_no_loaded_at_field,
source_with_loaded_at_field.unique_id: source_with_loaded_at_field,
}
adapter.Relation.create_from.return_value = "source_relation"
adapter.calculate_freshness_from_metadata_batch.return_value = (
[],
{"source_relation": freshness_response},
)
task = FreshnessTask(args=args, config=config, manifest=manifest)

task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id})

assert task.get_freshness_metadata_cache() == {"source_relation": freshness_response}

def test_populate_metadata_freshness_cache_adapter_exception(
self, args, config, manifest, adapter, source_no_loaded_at_field, freshness_response
):
manifest.sources = {source_no_loaded_at_field.unique_id: source_no_loaded_at_field}
adapter.Relation.create_from.return_value = "source_relation"
adapter.calculate_freshness_from_metadata_batch.side_effect = Exception()
task = FreshnessTask(args=args, config=config, manifest=manifest)

task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id})

assert task.get_freshness_metadata_cache() == {}

0 comments on commit cb56f4f

Please sign in to comment.