Skip to content

Commit

Permalink
adapter function for freshness via custom sql (#384)
Browse files Browse the repository at this point in the history
Co-authored-by: Colin Rogers <[email protected]>
  • Loading branch information
ChenyuLInx and colin-rogers-dbt authored Dec 18, 2024
1 parent f481b19 commit 54c3e53
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 37 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241216-172047.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add function to run custom sql for getting freshness info
time: 2024-12-16T17:20:47.065611-08:00
custom:
Author: ChenyuLInx
Issue: "8797"
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import Type
from unittest.mock import MagicMock

from dbt_common.exceptions import DbtRuntimeError
import pytest

from dbt.adapters.base.impl import BaseAdapter


class BaseCalculateFreshnessMethod:
"""Tests the behavior of the calculate_freshness_from_customsql method for the relevant adapters.
The base method is meant to throw the appropriate custom exception when calculate_freshness_from_customsql
fails.
"""

@pytest.fixture(scope="class")
def valid_sql(self) -> str:
"""Returns a valid statement for issuing as a validate_sql query.
Ideally this would be checkable for non-execution. For example, we could use a
CREATE TABLE statement with an assertion that no table was created. However,
for most adapter types this is unnecessary - the EXPLAIN keyword has exactly the
behavior we want, and here we are essentially testing to make sure it is
supported. As such, we return a simple SELECT query, and leave it to
engine-specific test overrides to specify more detailed behavior as appropriate.
"""

return "select now()"

@pytest.fixture(scope="class")
def invalid_sql(self) -> str:
"""Returns an invalid statement for issuing a bad validate_sql query."""

return "Let's run some invalid SQL and see if we get an error!"

@pytest.fixture(scope="class")
def expected_exception(self) -> Type[Exception]:
"""Returns the Exception type thrown by a failed query.
Defaults to dbt_common.exceptions.DbtRuntimeError because that is the most common
base exception for adapters to throw."""
return DbtRuntimeError

@pytest.fixture(scope="class")
def mock_relation(self):
mock = MagicMock()
mock.__str__ = lambda x: "test.table"
return mock

def test_calculate_freshness_from_custom_sql_success(
self, adapter: BaseAdapter, valid_sql: str, mock_relation
) -> None:
with adapter.connection_named("test_freshness_custom_sql"):
adapter.calculate_freshness_from_custom_sql(mock_relation, valid_sql)

def test_calculate_freshness_from_custom_sql_failure(
self,
adapter: BaseAdapter,
invalid_sql: str,
expected_exception: Type[Exception],
mock_relation,
) -> None:
with pytest.raises(expected_exception=expected_exception):
with adapter.connection_named("test_infreshness_custom_sql"):
adapter.calculate_freshness_from_custom_sql(mock_relation, invalid_sql)


class TestCalculateFreshnessMethod(BaseCalculateFreshnessMethod):
pass
77 changes: 40 additions & 37 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
GET_CATALOG_MACRO_NAME = "get_catalog"
GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations"
FRESHNESS_MACRO_NAME = "collect_freshness"
CUSTOM_SQL_FRESHNESS_MACRO_NAME = "collect_freshness_custom_sql"
GET_RELATION_LAST_MODIFIED_MACRO_NAME = "get_relation_last_modified"
DEFAULT_BASE_BEHAVIOR_FLAGS = [
{
Expand Down Expand Up @@ -1327,6 +1328,31 @@ def cancel_open_connections(self):
"""Cancel all open connections."""
return self.connections.cancel_open()

def _process_freshness_execution(
self,
macro_name: str,
kwargs: Dict[str, Any],
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
"""Execute and process a freshness macro to generate a FreshnessResponse"""
import agate

result = self.execute_macro(macro_name, kwargs=kwargs, macro_resolver=macro_resolver)

if isinstance(result, agate.Table):
warn_or_error(CollectFreshnessReturnSignature())
table = result
adapter_response = None
else:
adapter_response, table = result.response, result.table

# Process the results table
if len(table) != 1 or len(table[0]) != 2:
raise MacroResultError(macro_name, table)

freshness_response = self._create_freshness_response(table[0][0], table[0][1])
return adapter_response, freshness_response

def calculate_freshness(
self,
source: BaseRelation,
Expand All @@ -1335,49 +1361,26 @@ def calculate_freshness(
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
"""Calculate the freshness of sources in dbt, and return it"""
import agate

kwargs: Dict[str, Any] = {
kwargs = {
"source": source,
"loaded_at_field": loaded_at_field,
"filter": filter,
}
return self._process_freshness_execution(FRESHNESS_MACRO_NAME, kwargs, macro_resolver)

# run the macro
# in older versions of dbt-core, the 'collect_freshness' macro returned the table of results directly
# starting in v1.5, by default, we return both the table and the adapter response (metadata about the query)
result: Union[
AttrDict, # current: contains AdapterResponse + "agate.Table"
"agate.Table", # previous: just table
]
result = self.execute_macro(
FRESHNESS_MACRO_NAME, kwargs=kwargs, macro_resolver=macro_resolver
)
if isinstance(result, agate.Table):
warn_or_error(CollectFreshnessReturnSignature())
adapter_response = None
table = result
else:
adapter_response, table = result.response, result.table # type: ignore[attr-defined]
# now we have a 1-row table of the maximum `loaded_at_field` value and
# the current time according to the db.
if len(table) != 1 or len(table[0]) != 2:
raise MacroResultError(FRESHNESS_MACRO_NAME, table)
if table[0][0] is None:
# no records in the table, so really the max_loaded_at was
# infinitely long ago. Just call it 0:00 January 1 year UTC
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(table[0][0], source, loaded_at_field)

snapshotted_at = _utc(table[0][1], source, loaded_at_field)
age = (snapshotted_at - max_loaded_at).total_seconds()
freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
def calculate_freshness_from_custom_sql(
self,
source: BaseRelation,
sql: str,
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
kwargs = {
"source": source,
"loaded_at_query": sql,
}
return adapter_response, freshness
return self._process_freshness_execution(
CUSTOM_SQL_FRESHNESS_MACRO_NAME, kwargs, macro_resolver
)

def calculate_freshness_from_metadata_batch(
self,
Expand Down
16 changes: 16 additions & 0 deletions dbt/include/global_project/macros/adapters/freshness.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,19 @@
{% endcall %}
{{ return(load_result('collect_freshness')) }}
{% endmacro %}

{% macro collect_freshness_custom_sql(source, loaded_at_query) %}
{{ return(adapter.dispatch('collect_freshness_custom_sql', 'dbt')(source, loaded_at_query))}}
{% endmacro %}

{% macro default__collect_freshness_custom_sql(source, loaded_at_query) %}
{% call statement('collect_freshness_custom_sql', fetch_result=True, auto_begin=False) -%}
with source_query as (
{{ loaded_at_query }}
)
select
(select * from source_query) as max_loaded_at,
{{ current_timestamp() }} as snapshotted_at
{% endcall %}
{{ return(load_result('collect_freshness_custom_sql')) }}
{% endmacro %}
148 changes: 148 additions & 0 deletions tests/unit/test_base_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

from dbt.adapters.base.impl import BaseAdapter, ConstraintSupport

from datetime import datetime
from unittest.mock import MagicMock, patch
import agate
import pytz
from dbt.adapters.contracts.connection import AdapterResponse


class TestBaseAdapterConstraintRendering:
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -234,3 +240,145 @@ def test_render_raw_model_constraints_unsupported(

rendered_constraints = BaseAdapter.render_raw_model_constraints(constraints)
assert rendered_constraints == []


class TestCalculateFreshnessFromCustomSQL:
@pytest.fixture
def adapter(self):
# Create mock config and context
config = MagicMock()

# Create test adapter class that implements abstract methods
class TestAdapter(BaseAdapter):
def convert_boolean_type(self, *args, **kwargs):
return None

def convert_date_type(self, *args, **kwargs):
return None

def convert_datetime_type(self, *args, **kwargs):
return None

def convert_number_type(self, *args, **kwargs):
return None

def convert_text_type(self, *args, **kwargs):
return None

def convert_time_type(self, *args, **kwargs):
return None

def create_schema(self, *args, **kwargs):
return None

def date_function(self, *args, **kwargs):
return None

def drop_relation(self, *args, **kwargs):
return None

def drop_schema(self, *args, **kwargs):
return None

def expand_column_types(self, *args, **kwargs):
return None

def get_columns_in_relation(self, *args, **kwargs):
return None

def is_cancelable(self, *args, **kwargs):
return False

def list_relations_without_caching(self, *args, **kwargs):
return []

def list_schemas(self, *args, **kwargs):
return []

def quote(self, *args, **kwargs):
return ""

def rename_relation(self, *args, **kwargs):
return None

def truncate_relation(self, *args, **kwargs):
return None

return TestAdapter(config, MagicMock())

@pytest.fixture
def mock_relation(self):
mock = MagicMock()
mock.__str__ = lambda x: "test.table"
return mock

@patch("dbt.adapters.base.BaseAdapter.execute_macro")
def test_calculate_freshness_from_customsql_success(
self, mock_execute_macro, adapter, mock_relation
):
"""Test successful freshness calculation from custom SQL"""

# Setup test data
current_time = datetime.now(pytz.UTC)
last_modified = datetime(2023, 1, 1, tzinfo=pytz.UTC)

# Create mock agate table with test data
mock_table = agate.Table.from_object(
[{"last_modified": last_modified, "snapshotted_at": current_time}]
)

# Configure mock execute_macro
mock_execute_macro.return_value = MagicMock(
response=AdapterResponse("SUCCESS"), table=mock_table
)

# Execute method under test
adapter_response, freshness_response = adapter.calculate_freshness_from_custom_sql(
source=mock_relation, sql="SELECT max(updated_at) as last_modified"
)

# Verify execute_macro was called correctly
mock_execute_macro.assert_called_once_with(
"collect_freshness_custom_sql",
kwargs={
"source": mock_relation,
"loaded_at_query": "SELECT max(updated_at) as last_modified",
},
macro_resolver=None,
)

# Verify adapter response
assert adapter_response._message == "SUCCESS"

# Verify freshness response
assert freshness_response["max_loaded_at"] == last_modified
assert freshness_response["snapshotted_at"] == current_time
assert isinstance(freshness_response["age"], float)

@patch("dbt.adapters.base.BaseAdapter.execute_macro")
def test_calculate_freshness_from_customsql_null_last_modified(
self, mock_execute_macro, adapter, mock_relation
):
"""Test freshness calculation when last_modified is NULL"""

current_time = datetime.now(pytz.UTC)

# Create mock table with NULL last_modified
mock_table = agate.Table.from_object(
[{"last_modified": None, "snapshotted_at": current_time}]
)

mock_execute_macro.return_value = MagicMock(
response=AdapterResponse("SUCCESS"), table=mock_table
)

# Execute method
_, freshness_response = adapter.calculate_freshness_from_custom_sql(
source=mock_relation, sql="SELECT max(updated_at) as last_modified"
)

# Verify NULL last_modified is handled by using datetime.min
expected_min_date = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
assert freshness_response["max_loaded_at"] == expected_min_date
assert freshness_response["snapshotted_at"] == current_time
assert isinstance(freshness_response["age"], float)

0 comments on commit 54c3e53

Please sign in to comment.