From bf80a112e01a8a3c8f9eac5d88c6ca1689840385 Mon Sep 17 00:00:00 2001 From: Mike Alfare <13974384+mikealfare@users.noreply.github.com> Date: Tue, 27 Feb 2024 14:22:30 -0500 Subject: [PATCH] [Backport 1.7.latest] Implementation of metadata-based freshness (#721) * cherry pick pr#649 * turn off catalog by relations that was inadvertently turned on during the cherry pick --- .../unreleased/Features-20231219-120533.yaml | 6 +++ dbt/adapters/redshift/impl.py | 7 ++++ .../metadata/relation_last_modified.sql | 29 +++++++++++++ .../adapter/sources_freshness_tests/files.py | 38 +++++++++++++++++ .../test_get_relation_last_modified.py | 42 +++++++++++++++++++ 5 files changed, 122 insertions(+) create mode 100644 .changes/unreleased/Features-20231219-120533.yaml create mode 100644 dbt/include/redshift/macros/metadata/relation_last_modified.sql create mode 100644 tests/functional/adapter/sources_freshness_tests/files.py create mode 100644 tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py diff --git a/.changes/unreleased/Features-20231219-120533.yaml b/.changes/unreleased/Features-20231219-120533.yaml new file mode 100644 index 000000000..15b5ba1f1 --- /dev/null +++ b/.changes/unreleased/Features-20231219-120533.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add support for checking table-last-modified by metadata +time: 2023-12-19T12:05:33.784649-05:00 +custom: + Author: mikealfare + Issue: "615" diff --git a/dbt/adapters/redshift/impl.py b/dbt/adapters/redshift/impl.py index ae9f18392..f523acfb7 100644 --- a/dbt/adapters/redshift/impl.py +++ b/dbt/adapters/redshift/impl.py @@ -4,6 +4,7 @@ from dbt.adapters.base import PythonJobHelper from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport from dbt.adapters.base.meta import available +from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support from dbt.adapters.sql import SQLAdapter from dbt.contracts.connection import AdapterResponse from dbt.contracts.graph.nodes import ConstraintType @@ -46,6 +47,12 @@ class RedshiftAdapter(SQLAdapter): ConstraintType.foreign_key: ConstraintSupport.NOT_ENFORCED, } + _capabilities = CapabilityDict( + { + Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full), + } + ) + @classmethod def date_function(cls): return "getdate()" diff --git a/dbt/include/redshift/macros/metadata/relation_last_modified.sql b/dbt/include/redshift/macros/metadata/relation_last_modified.sql new file mode 100644 index 000000000..f21299c72 --- /dev/null +++ b/dbt/include/redshift/macros/metadata/relation_last_modified.sql @@ -0,0 +1,29 @@ +{% macro redshift__get_relation_last_modified(information_schema, relations) -%} + + {%- call statement('last_modified', fetch_result=True) -%} + select + ns.nspname as "schema", + c.relname as identifier, + max(qd.start_time) as last_modified, + {{ current_timestamp() }} as snapshotted_at + from pg_class c + join pg_namespace ns + on ns.oid = c.relnamespace + join sys_query_detail qd + on qd.table_id = c.oid + where qd.step_name = 'insert' + and ( + {%- for relation in relations -%} + ( + upper(ns.nspname) = upper('{{ relation.schema }}') + and upper(c.relname) = upper('{{ relation.identifier }}') + ) + {%- if not loop.last %} or {% endif -%} + {%- endfor -%} + ) + group by 1, 2, 4 + {%- endcall -%} + + {{ return(load_result('last_modified')) }} + +{% endmacro %} diff --git a/tests/functional/adapter/sources_freshness_tests/files.py b/tests/functional/adapter/sources_freshness_tests/files.py new file mode 100644 index 000000000..f2dd6fbbe --- /dev/null +++ b/tests/functional/adapter/sources_freshness_tests/files.py @@ -0,0 +1,38 @@ +SCHEMA_YML = """version: 2 +sources: + - name: test_source + freshness: + warn_after: {count: 10, period: hour} + error_after: {count: 1, period: day} + schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}" + tables: + - name: test_source_no_last_modified + - name: test_source_last_modified + loaded_at_field: last_modified +""" + +SEED_TEST_SOURCE_NO_LAST_MODIFIED_CSV = """ +id,name +1,Martin +2,Jeter +3,Ruth +4,Gehrig +5,DiMaggio +6,Torre +7,Mantle +8,Berra +9,Maris +""".strip() + +SEED_TEST_SOURCE_LAST_MODIFIED_CSV = """ +id,name,last_modified +1,Martin,2023-01-01 00:00:00 +2,Jeter,2023-02-01 00:00:00 +3,Ruth,2023-03-01 00:00:00 +4,Gehrig,2023-04-01 00:00:00 +5,DiMaggio,2023-05-01 00:00:00 +6,Torre,2023-06-01 00:00:00 +7,Mantle,2023-07-01 00:00:00 +8,Berra,2023-08-01 00:00:00 +9,Maris,2023-09-01 00:00:00 +""".strip() diff --git a/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py b/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py new file mode 100644 index 000000000..6a77d22ae --- /dev/null +++ b/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py @@ -0,0 +1,42 @@ +import os + +from dbt.tests.util import run_dbt +import pytest + +from tests.functional.adapter.sources_freshness_tests import files + + +class TestGetLastRelationModified: + @pytest.fixture(scope="class") + def seeds(self): + return { + "test_source_no_last_modified.csv": files.SEED_TEST_SOURCE_NO_LAST_MODIFIED_CSV, + "test_source_last_modified.csv": files.SEED_TEST_SOURCE_LAST_MODIFIED_CSV, + } + + @pytest.fixture(scope="class") + def models(self): + return {"schema.yml": files.SCHEMA_YML} + + @pytest.fixture(scope="class", autouse=True) + def setup(self, project): + # we need the schema name for the sources section + os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema + run_dbt(["seed"]) + yield + del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] + + @pytest.mark.parametrize( + "source,status,expect_pass", + [ + ("test_source.test_source_no_last_modified", "pass", True), + ("test_source.test_source_last_modified", "error", False), # stale + ], + ) + def test_get_last_relation_modified(self, project, source, status, expect_pass): + results = run_dbt( + ["source", "freshness", "--select", f"source:{source}"], expect_pass=expect_pass + ) + assert len(results) == 1 + result = results[0] + assert result.status == status