diff --git a/.changes/unreleased/Features-20240404-170728.yaml b/.changes/unreleased/Features-20240404-170728.yaml new file mode 100644 index 00000000000..6db7735acbc --- /dev/null +++ b/.changes/unreleased/Features-20240404-170728.yaml @@ -0,0 +1,6 @@ +kind: Features +body: 'source freshness precomputes metadata-based freshness in batch, if possible ' +time: 2024-04-04T17:07:28.717868-07:00 +custom: + Author: michelleark + Issue: "8705" diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index ed37a9e19f1..b1fe7581c30 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -1,7 +1,7 @@ import os import threading import time -from typing import Optional, List +from typing import Optional, List, AbstractSet, Dict from .base import BaseRunner from .printer import ( @@ -28,6 +28,8 @@ from dbt.adapters.capability import Capability from dbt.adapters.contracts.connection import AdapterResponse +from dbt.adapters.base.relation import BaseRelation +from dbt.adapters.base.impl import FreshnessResponse from dbt.contracts.graph.nodes import SourceDefinition, HookNode from dbt_common.events.base_types import EventLevel from dbt.graph import ResourceTypeSelector @@ -36,6 +38,15 @@ class FreshnessRunner(BaseRunner): + def __init__(self, config, adapter, node, node_index, num_nodes) -> None: + super().__init__(config, adapter, node, node_index, num_nodes) + self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {} + + def set_metadata_freshness_cache( + self, metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] + ) -> None: + self._metadata_freshness_cache = metadata_freshness_cache + def on_skip(self): raise DbtRuntimeError("Freshness: nodes cannot be skipped!") @@ -105,7 +116,7 @@ def execute(self, compiled_node, manifest): with self.adapter.connection_named(compiled_node.unique_id, compiled_node): self.adapter.clear_transaction() adapter_response: Optional[AdapterResponse] = None - freshness = None + freshness: Optional[FreshnessResponse] = None if compiled_node.loaded_at_field is not None: adapter_response, freshness = self.adapter.calculate_freshness( @@ -125,10 +136,14 @@ def execute(self, compiled_node, manifest): EventLevel.WARN, ) - adapter_response, freshness = self.adapter.calculate_freshness_from_metadata( - relation, - macro_resolver=manifest, - ) + metadata_source = self.adapter.Relation.create_from(self.config, compiled_node) + if metadata_source in self._metadata_freshness_cache: + freshness = self._metadata_freshness_cache[metadata_source] + else: + adapter_response, freshness = self.adapter.calculate_freshness_from_metadata( + relation, + macro_resolver=manifest, + ) status = compiled_node.freshness.status(freshness["age"]) else: @@ -171,6 +186,10 @@ def node_is_match(self, node): class FreshnessTask(RunTask): + def __init__(self, args, config, manifest) -> None: + super().__init__(args, config, manifest) + self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {} + def result_path(self): if self.args.output: return os.path.realpath(self.args.output) @@ -190,6 +209,17 @@ def get_node_selector(self): resource_types=[NodeType.Source], ) + def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None: + super().before_run(adapter, selected_uids) + if adapter.supports(Capability.TableLastModifiedMetadataBatch): + self.populate_metadata_freshness_cache(adapter, selected_uids) + + def get_runner(self, node) -> BaseRunner: + freshness_runner = super().get_runner(node) + assert isinstance(freshness_runner, FreshnessRunner) + freshness_runner.set_metadata_freshness_cache(self._metadata_freshness_cache) + return freshness_runner + def get_runner_type(self, _): return FreshnessRunner @@ -214,3 +244,40 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]: return super().get_hooks_by_type(hook_type) else: return [] + + def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[str]) -> None: + if self.manifest is None: + raise DbtInternalError("Manifest must be set to populate metadata freshness cache") + + batch_metadata_sources: List[BaseRelation] = [] + for selected_source_uid in list(selected_uids): + source = self.manifest.sources.get(selected_source_uid) + if source and source.loaded_at_field is None: + metadata_source = adapter.Relation.create_from(self.config, source) + batch_metadata_sources.append(metadata_source) + + fire_event( + Note( + msg=f"Pulling freshness from warehouse metadata tables for {len(batch_metadata_sources)} sources" + ), + EventLevel.INFO, + ) + + try: + _, metadata_freshness_results = adapter.calculate_freshness_from_metadata_batch( + batch_metadata_sources + ) + self._metadata_freshness_cache.update(metadata_freshness_results) + except Exception as e: + # This error handling is intentionally very coarse. + # If anything goes wrong during batch metadata calculation, we can safely + # leave _metadata_freshness_cache unpopulated. + # Downstream, this will be gracefully handled as a cache miss and non-batch + # metadata-based freshness will still be performed on a source-by-source basis. + fire_event( + Note(msg=f"Metadata freshness could not be computed in batch: {e}"), + EventLevel.WARN, + ) + + def get_freshness_metadata_cache(self) -> Dict[BaseRelation, FreshnessResult]: + return self._metadata_freshness_cache diff --git a/dev-requirements.txt b/dev-requirements.txt index 0c706dddbbe..a1decf2ae62 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,4 +1,4 @@ -git+https://github.com/dbt-labs/dbt-adapters.git@main +git+https://github.com/dbt-labs/dbt-adapters.git git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git@main git+https://github.com/dbt-labs/dbt-postgres.git@main diff --git a/tests/unit/test_freshness_task.py b/tests/unit/test_freshness_task.py new file mode 100644 index 00000000000..05c00df75da --- /dev/null +++ b/tests/unit/test_freshness_task.py @@ -0,0 +1,154 @@ +import datetime +import pytest +from unittest import mock + +from dbt.task.freshness import FreshnessTask, FreshnessResponse + + +class TestFreshnessTaskMetadataCache: + @pytest.fixture(scope="class") + def args(self): + mock_args = mock.Mock() + mock_args.state = None + mock_args.defer_state = None + mock_args.write_json = None + + return mock_args + + @pytest.fixture(scope="class") + def config(self): + mock_config = mock.Mock() + mock_config.threads = 1 + mock_config.target_name = "mock_config_target_name" + + @pytest.fixture(scope="class") + def manifest(self): + return mock.Mock() + + @pytest.fixture(scope="class") + def source_with_loaded_at_field(self): + mock_source = mock.Mock() + mock_source.unique_id = "source_with_loaded_at_field" + mock_source.loaded_at_field = "loaded_at_field" + return mock_source + + @pytest.fixture(scope="class") + def source_no_loaded_at_field(self): + mock_source = mock.Mock() + mock_source.unique_id = "source_no_loaded_at_field" + return mock_source + + @pytest.fixture(scope="class") + def source_no_loaded_at_field2(self): + mock_source = mock.Mock() + mock_source.unique_id = "source_no_loaded_at_field2" + return mock_source + + @pytest.fixture(scope="class") + def adapter(self): + return mock.Mock() + + @pytest.fixture(scope="class") + def freshness_response(self): + return FreshnessResponse( + max_loaded_at=datetime.datetime(2020, 5, 2), + snapshotted_at=datetime.datetime(2020, 5, 4), + age=2, + ) + + def test_populate_metadata_freshness_cache( + self, args, config, manifest, adapter, source_no_loaded_at_field, freshness_response + ): + manifest.sources = {source_no_loaded_at_field.unique_id: source_no_loaded_at_field} + adapter.Relation.create_from.return_value = "source_relation" + adapter.calculate_freshness_from_metadata_batch.return_value = ( + [], + {"source_relation": freshness_response}, + ) + task = FreshnessTask(args=args, config=config, manifest=manifest) + + task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id}) + + assert task.get_freshness_metadata_cache() == {"source_relation": freshness_response} + + def test_populate_metadata_freshness_cache_multiple_sources( + self, + args, + config, + manifest, + adapter, + source_no_loaded_at_field, + source_no_loaded_at_field2, + freshness_response, + ): + manifest.sources = { + source_no_loaded_at_field.unique_id: source_no_loaded_at_field, + source_no_loaded_at_field2.unique_id: source_no_loaded_at_field2, + } + adapter.Relation.create_from.side_effect = ["source_relation1", "source_relation2"] + adapter.calculate_freshness_from_metadata_batch.return_value = ( + [], + {"source_relation1": freshness_response, "source_relation2": freshness_response}, + ) + task = FreshnessTask(args=args, config=config, manifest=manifest) + + task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id}) + + assert task.get_freshness_metadata_cache() == { + "source_relation1": freshness_response, + "source_relation2": freshness_response, + } + + def test_populate_metadata_freshness_cache_with_loaded_at_field( + self, args, config, manifest, adapter, source_with_loaded_at_field, freshness_response + ): + manifest.sources = { + source_with_loaded_at_field.unique_id: source_with_loaded_at_field, + } + adapter.Relation.create_from.return_value = "source_relation" + adapter.calculate_freshness_from_metadata_batch.return_value = ( + [], + {"source_relation": freshness_response}, + ) + task = FreshnessTask(args=args, config=config, manifest=manifest) + + task.populate_metadata_freshness_cache(adapter, {source_with_loaded_at_field.unique_id}) + + assert task.get_freshness_metadata_cache() == {"source_relation": freshness_response} + + def test_populate_metadata_freshness_cache_multiple_sources_mixed( + self, + args, + config, + manifest, + adapter, + source_no_loaded_at_field, + source_with_loaded_at_field, + freshness_response, + ): + manifest.sources = { + source_no_loaded_at_field.unique_id: source_no_loaded_at_field, + source_with_loaded_at_field.unique_id: source_with_loaded_at_field, + } + adapter.Relation.create_from.return_value = "source_relation" + adapter.calculate_freshness_from_metadata_batch.return_value = ( + [], + {"source_relation": freshness_response}, + ) + task = FreshnessTask(args=args, config=config, manifest=manifest) + + task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id}) + + assert task.get_freshness_metadata_cache() == {"source_relation": freshness_response} + + def test_populate_metadata_freshness_cache_adapter_exception( + self, args, config, manifest, adapter, source_no_loaded_at_field, freshness_response + ): + manifest.sources = {source_no_loaded_at_field.unique_id: source_no_loaded_at_field} + adapter.Relation.create_from.return_value = "source_relation" + adapter.calculate_freshness_from_metadata_batch.side_effect = Exception() + task = FreshnessTask(args=args, config=config, manifest=manifest) + + task.populate_metadata_freshness_cache(adapter, {source_no_loaded_at_field.unique_id}) + + assert task.get_freshness_metadata_cache() == {}