Skip to content

Commit

Permalink
add Capability.TableLastModifiedMetadataBatch support (#928)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk authored Apr 17, 2024
1 parent 8b75a53 commit 3b014a8
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 6 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240404-171704.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support TableLastModifiedMetadataBatch capability
time: 2024-04-04T17:17:04.853047-07:00
custom:
Author: michelleark
Issue: "965"
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class SnowflakeAdapter(SQLAdapter):
{
Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full),
Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full),
Capability.TableLastModifiedMetadataBatch: CapabilitySupport(support=Support.Full),
}
)

Expand Down
104 changes: 98 additions & 6 deletions tests/functional/adapter/test_get_last_relation_modified.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import os
import pytest
from unittest import mock

from dbt.adapters.snowflake.impl import SnowflakeAdapter
from dbt.adapters.capability import Capability, CapabilityDict
from dbt.cli.main import dbtRunner


freshness_via_metadata_schema_yml = """version: 2
freshness_via_metadata_schema_yml = """
sources:
- name: test_source
freshness:
Expand All @@ -15,18 +18,28 @@
- name: test_table
"""

freshness_metadata_schema_batch_yml = """
sources:
- name: test_source
freshness:
warn_after: {count: 10, period: hour}
error_after: {count: 1, period: day}
schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}"
tables:
- name: test_table
- name: test_table2
- name: test_table_with_loaded_at_field
loaded_at_field: my_loaded_at_field
"""

class TestGetLastRelationModified:

class SetupGetLastRelationModified:
@pytest.fixture(scope="class", autouse=True)
def set_env_vars(self, project):
os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema
yield
del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]

@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_via_metadata_schema_yml}

@pytest.fixture(scope="class")
def custom_schema(self, project, set_env_vars):
with project.adapter.connection_named("__test"):
Expand All @@ -41,6 +54,12 @@ def custom_schema(self, project, set_env_vars):
with project.adapter.connection_named("__test"):
project.adapter.drop_schema(relation)


class TestGetLastRelationModified(SetupGetLastRelationModified):
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_via_metadata_schema_yml}

def test_get_last_relation_modified(self, project, set_env_vars, custom_schema):
project.run_sql(
f"create table {custom_schema}.test_table (id integer autoincrement, name varchar(100) not null);"
Expand All @@ -58,3 +77,76 @@ def probe(e):

# The 'source freshness' command should succeed without warnings or errors.
assert not warning_or_error


class TestGetLastRelationModifiedBatch(SetupGetLastRelationModified):
@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_metadata_schema_batch_yml}

def get_freshness_result_for_table(self, table_name, results):
for result in results:
if result.node.name == table_name:
return result
return None

def test_get_last_relation_modified_batch(self, project, set_env_vars, custom_schema):
project.run_sql(
f"create table {custom_schema}.test_table (id integer autoincrement, name varchar(100) not null);"
)
project.run_sql(
f"create table {custom_schema}.test_table2 (id integer autoincrement, name varchar(100) not null);"
)
project.run_sql(
f"create table {custom_schema}.test_table_with_loaded_at_field as (select 1 as id, timestamp '2009-09-15 10:59:43' as my_loaded_at_field);"
)

runner = dbtRunner()
freshness_results_batch = runner.invoke(["source", "freshness"]).result

assert len(freshness_results_batch) == 3
test_table_batch_result = self.get_freshness_result_for_table(
"test_table", freshness_results_batch
)
test_table2_batch_result = self.get_freshness_result_for_table(
"test_table2", freshness_results_batch
)
test_table_with_loaded_at_field_batch_result = self.get_freshness_result_for_table(
"test_table_with_loaded_at_field", freshness_results_batch
)

# Remove TableLastModifiedMetadataBatch and run freshness on same input without batch strategy
capabilities_no_batch = CapabilityDict(
{
capability: support
for capability, support in SnowflakeAdapter.capabilities().items()
if capability != Capability.TableLastModifiedMetadataBatch
}
)
with mock.patch.object(
SnowflakeAdapter, "capabilities", return_value=capabilities_no_batch
):
freshness_results = runner.invoke(["source", "freshness"]).result

assert len(freshness_results) == 3
test_table_result = self.get_freshness_result_for_table("test_table", freshness_results)
test_table2_result = self.get_freshness_result_for_table("test_table2", freshness_results)
test_table_with_loaded_at_field_result = self.get_freshness_result_for_table(
"test_table_with_loaded_at_field", freshness_results
)

# assert results between batch vs non-batch freshness strategy are equivalent
assert test_table_result.status == test_table_batch_result.status
assert test_table_result.max_loaded_at == test_table_batch_result.max_loaded_at

assert test_table2_result.status == test_table2_batch_result.status
assert test_table2_result.max_loaded_at == test_table2_batch_result.max_loaded_at

assert (
test_table_with_loaded_at_field_batch_result.status
== test_table_with_loaded_at_field_result.status
)
assert (
test_table_with_loaded_at_field_batch_result.max_loaded_at
== test_table_with_loaded_at_field_result.max_loaded_at
)

0 comments on commit 3b014a8

Please sign in to comment.