Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TableLastModifiedMetadataBatch capability #744

Merged
merged 12 commits into from
Apr 18, 2024
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240404-171441.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support TableLastModifiedMetadataBatch capability
time: 2024-04-04T17:14:41.313087-07:00
custom:
Author: michelleark
Issue: "755"
1 change: 1 addition & 0 deletions dbt/adapters/redshift/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class RedshiftAdapter(SQLAdapter):
{
Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full),
Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full),
Capability.TableLastModifiedMetadataBatch: CapabilitySupport(support=Support.Full),
}
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
import os
import pytest
from unittest import mock

from dbt.adapters.redshift.impl import RedshiftAdapter
from dbt.adapters.capability import Capability, CapabilityDict
from dbt.cli.main import dbtRunner
from dbt.tests.util import run_dbt
import pytest

from tests.functional.adapter.sources_freshness_tests import files


class TestGetLastRelationModified:
class SetupGetLastRelationModified:
@pytest.fixture(scope="class", autouse=True)
def set_env_vars(self, project):
os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema
yield
del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]


class TestGetLastRelationModified(SetupGetLastRelationModified):
@pytest.fixture(scope="class")
def seeds(self):
return {
Expand All @@ -18,14 +30,6 @@ def seeds(self):
def models(self):
return {"schema.yml": files.SCHEMA_YML}

@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
# we need the schema name for the sources section
os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema
run_dbt(["seed"])
yield
del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]

@pytest.mark.parametrize(
"source,status,expect_pass",
[
Expand All @@ -34,9 +38,113 @@ def setup(self, project):
],
)
def test_get_last_relation_modified(self, project, source, status, expect_pass):
run_dbt(["seed"])

results = run_dbt(
["source", "freshness", "--select", f"source:{source}"], expect_pass=expect_pass
)
assert len(results) == 1
result = results[0]
assert result.status == status


freshness_metadata_schema_batch_yml = """
sources:
- name: test_source
freshness:
warn_after: {count: 10, period: hour}
error_after: {count: 1, period: day}
schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}"
tables:
- name: test_table
- name: test_table2
- name: test_table_with_loaded_at_field
loaded_at_field: my_loaded_at_field
"""


class TestGetLastRelationModifiedBatch(SetupGetLastRelationModified):
@pytest.fixture(scope="class")
def custom_schema(self, project, set_env_vars):
with project.adapter.connection_named("__test"):
relation = project.adapter.Relation.create(
database=project.database, schema=os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]
)
project.adapter.drop_schema(relation)
project.adapter.create_schema(relation)

yield relation.schema

with project.adapter.connection_named("__test"):
project.adapter.drop_schema(relation)

@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_metadata_schema_batch_yml}

def get_freshness_result_for_table(self, table_name, results):
for result in results:
if result.node.name == table_name:
return result
return None

def test_get_last_relation_modified_batch(self, project, custom_schema):
project.run_sql(
f"create table {custom_schema}.test_table as (select 1 as id, 'test' as name);"
)
project.run_sql(
f"create table {custom_schema}.test_table2 as (select 1 as id, 'test' as name);"
)
project.run_sql(
f"create table {custom_schema}.test_table_with_loaded_at_field as (select 1 as id, timestamp '2009-09-15 10:59:43' as my_loaded_at_field);"
)

runner = dbtRunner()
freshness_results_batch = runner.invoke(["source", "freshness"]).result

assert len(freshness_results_batch) == 3
test_table_batch_result = self.get_freshness_result_for_table(
"test_table", freshness_results_batch
)
test_table2_batch_result = self.get_freshness_result_for_table(
"test_table2", freshness_results_batch
)
test_table_with_loaded_at_field_batch_result = self.get_freshness_result_for_table(
"test_table_with_loaded_at_field", freshness_results_batch
)

# Remove TableLastModifiedMetadataBatch and run freshness on same input without batch strategy
capabilities_no_batch = CapabilityDict(
{
capability: support
for capability, support in RedshiftAdapter.capabilities().items()
if capability != Capability.TableLastModifiedMetadataBatch
}
)
with mock.patch.object(
RedshiftAdapter, "capabilities", return_value=capabilities_no_batch
):
freshness_results = runner.invoke(["source", "freshness"]).result

assert len(freshness_results) == 3
test_table_result = self.get_freshness_result_for_table("test_table", freshness_results)
test_table2_result = self.get_freshness_result_for_table("test_table2", freshness_results)
test_table_with_loaded_at_field_result = self.get_freshness_result_for_table(
"test_table_with_loaded_at_field", freshness_results
)

# assert results between batch vs non-batch freshness strategy are equivalent
assert test_table_result.status == test_table_batch_result.status
assert test_table_result.max_loaded_at == test_table_batch_result.max_loaded_at

assert test_table2_result.status == test_table2_batch_result.status
assert test_table2_result.max_loaded_at == test_table2_batch_result.max_loaded_at

assert (
test_table_with_loaded_at_field_batch_result.status
== test_table_with_loaded_at_field_result.status
)
assert (
test_table_with_loaded_at_field_batch_result.max_loaded_at
== test_table_with_loaded_at_field_result.max_loaded_at
)
Loading