diff --git a/.changes/unreleased/Features-20241216-172047.yaml b/.changes/unreleased/Features-20241216-172047.yaml new file mode 100644 index 00000000..232d184b --- /dev/null +++ b/.changes/unreleased/Features-20241216-172047.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add function to run custom sql for getting freshness info +time: 2024-12-16T17:20:47.065611-08:00 +custom: + Author: ChenyuLInx + Issue: "8797" diff --git a/dbt-tests-adapter/dbt/tests/adapter/utils/test_source_freshness_custom_info.py b/dbt-tests-adapter/dbt/tests/adapter/utils/test_source_freshness_custom_info.py new file mode 100644 index 00000000..b4f15dab --- /dev/null +++ b/dbt-tests-adapter/dbt/tests/adapter/utils/test_source_freshness_custom_info.py @@ -0,0 +1,70 @@ +from typing import Type +from unittest.mock import MagicMock + +from dbt_common.exceptions import DbtRuntimeError +import pytest + +from dbt.adapters.base.impl import BaseAdapter + + +class BaseCalculateFreshnessMethod: + """Tests the behavior of the calculate_freshness_from_customsql method for the relevant adapters. + + The base method is meant to throw the appropriate custom exception when calculate_freshness_from_customsql + fails. + """ + + @pytest.fixture(scope="class") + def valid_sql(self) -> str: + """Returns a valid statement for issuing as a validate_sql query. + + Ideally this would be checkable for non-execution. For example, we could use a + CREATE TABLE statement with an assertion that no table was created. However, + for most adapter types this is unnecessary - the EXPLAIN keyword has exactly the + behavior we want, and here we are essentially testing to make sure it is + supported. As such, we return a simple SELECT query, and leave it to + engine-specific test overrides to specify more detailed behavior as appropriate. + """ + + return "select now()" + + @pytest.fixture(scope="class") + def invalid_sql(self) -> str: + """Returns an invalid statement for issuing a bad validate_sql query.""" + + return "Let's run some invalid SQL and see if we get an error!" + + @pytest.fixture(scope="class") + def expected_exception(self) -> Type[Exception]: + """Returns the Exception type thrown by a failed query. + + Defaults to dbt_common.exceptions.DbtRuntimeError because that is the most common + base exception for adapters to throw.""" + return DbtRuntimeError + + @pytest.fixture(scope="class") + def mock_relation(self): + mock = MagicMock() + mock.__str__ = lambda x: "test.table" + return mock + + def test_calculate_freshness_from_custom_sql_success( + self, adapter: BaseAdapter, valid_sql: str, mock_relation + ) -> None: + with adapter.connection_named("test_freshness_custom_sql"): + adapter.calculate_freshness_from_custom_sql(mock_relation, valid_sql) + + def test_calculate_freshness_from_custom_sql_failure( + self, + adapter: BaseAdapter, + invalid_sql: str, + expected_exception: Type[Exception], + mock_relation, + ) -> None: + with pytest.raises(expected_exception=expected_exception): + with adapter.connection_named("test_infreshness_custom_sql"): + adapter.calculate_freshness_from_custom_sql(mock_relation, invalid_sql) + + +class TestCalculateFreshnessMethod(BaseCalculateFreshnessMethod): + pass diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index ae172635..8474b39d 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -97,6 +97,7 @@ GET_CATALOG_MACRO_NAME = "get_catalog" GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations" FRESHNESS_MACRO_NAME = "collect_freshness" +CUSTOM_SQL_FRESHNESS_MACRO_NAME = "collect_freshness_custom_sql" GET_RELATION_LAST_MODIFIED_MACRO_NAME = "get_relation_last_modified" DEFAULT_BASE_BEHAVIOR_FLAGS = [ { @@ -1327,6 +1328,31 @@ def cancel_open_connections(self): """Cancel all open connections.""" return self.connections.cancel_open() + def _process_freshness_execution( + self, + macro_name: str, + kwargs: Dict[str, Any], + macro_resolver: Optional[MacroResolverProtocol] = None, + ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: + """Execute and process a freshness macro to generate a FreshnessResponse""" + import agate + + result = self.execute_macro(macro_name, kwargs=kwargs, macro_resolver=macro_resolver) + + if isinstance(result, agate.Table): + warn_or_error(CollectFreshnessReturnSignature()) + table = result + adapter_response = None + else: + adapter_response, table = result.response, result.table + + # Process the results table + if len(table) != 1 or len(table[0]) != 2: + raise MacroResultError(macro_name, table) + + freshness_response = self._create_freshness_response(table[0][0], table[0][1]) + return adapter_response, freshness_response + def calculate_freshness( self, source: BaseRelation, @@ -1335,49 +1361,26 @@ def calculate_freshness( macro_resolver: Optional[MacroResolverProtocol] = None, ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: """Calculate the freshness of sources in dbt, and return it""" - import agate - - kwargs: Dict[str, Any] = { + kwargs = { "source": source, "loaded_at_field": loaded_at_field, "filter": filter, } + return self._process_freshness_execution(FRESHNESS_MACRO_NAME, kwargs, macro_resolver) - # run the macro - # in older versions of dbt-core, the 'collect_freshness' macro returned the table of results directly - # starting in v1.5, by default, we return both the table and the adapter response (metadata about the query) - result: Union[ - AttrDict, # current: contains AdapterResponse + "agate.Table" - "agate.Table", # previous: just table - ] - result = self.execute_macro( - FRESHNESS_MACRO_NAME, kwargs=kwargs, macro_resolver=macro_resolver - ) - if isinstance(result, agate.Table): - warn_or_error(CollectFreshnessReturnSignature()) - adapter_response = None - table = result - else: - adapter_response, table = result.response, result.table # type: ignore[attr-defined] - # now we have a 1-row table of the maximum `loaded_at_field` value and - # the current time according to the db. - if len(table) != 1 or len(table[0]) != 2: - raise MacroResultError(FRESHNESS_MACRO_NAME, table) - if table[0][0] is None: - # no records in the table, so really the max_loaded_at was - # infinitely long ago. Just call it 0:00 January 1 year UTC - max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) - else: - max_loaded_at = _utc(table[0][0], source, loaded_at_field) - - snapshotted_at = _utc(table[0][1], source, loaded_at_field) - age = (snapshotted_at - max_loaded_at).total_seconds() - freshness: FreshnessResponse = { - "max_loaded_at": max_loaded_at, - "snapshotted_at": snapshotted_at, - "age": age, + def calculate_freshness_from_custom_sql( + self, + source: BaseRelation, + sql: str, + macro_resolver: Optional[MacroResolverProtocol] = None, + ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: + kwargs = { + "source": source, + "loaded_at_query": sql, } - return adapter_response, freshness + return self._process_freshness_execution( + CUSTOM_SQL_FRESHNESS_MACRO_NAME, kwargs, macro_resolver + ) def calculate_freshness_from_metadata_batch( self, diff --git a/dbt/include/global_project/macros/adapters/freshness.sql b/dbt/include/global_project/macros/adapters/freshness.sql index f18499a2..1af6165c 100644 --- a/dbt/include/global_project/macros/adapters/freshness.sql +++ b/dbt/include/global_project/macros/adapters/freshness.sql @@ -14,3 +14,19 @@ {% endcall %} {{ return(load_result('collect_freshness')) }} {% endmacro %} + +{% macro collect_freshness_custom_sql(source, loaded_at_query) %} + {{ return(adapter.dispatch('collect_freshness_custom_sql', 'dbt')(source, loaded_at_query))}} +{% endmacro %} + +{% macro default__collect_freshness_custom_sql(source, loaded_at_query) %} + {% call statement('collect_freshness_custom_sql', fetch_result=True, auto_begin=False) -%} + with source_query as ( + {{ loaded_at_query }} + ) + select + (select * from source_query) as max_loaded_at, + {{ current_timestamp() }} as snapshotted_at + {% endcall %} + {{ return(load_result('collect_freshness_custom_sql')) }} +{% endmacro %} diff --git a/tests/unit/test_base_adapter.py b/tests/unit/test_base_adapter.py index 5fa109b7..3d763710 100644 --- a/tests/unit/test_base_adapter.py +++ b/tests/unit/test_base_adapter.py @@ -4,6 +4,12 @@ from dbt.adapters.base.impl import BaseAdapter, ConstraintSupport +from datetime import datetime +from unittest.mock import MagicMock, patch +import agate +import pytz +from dbt.adapters.contracts.connection import AdapterResponse + class TestBaseAdapterConstraintRendering: @pytest.fixture(scope="class") @@ -234,3 +240,145 @@ def test_render_raw_model_constraints_unsupported( rendered_constraints = BaseAdapter.render_raw_model_constraints(constraints) assert rendered_constraints == [] + + +class TestCalculateFreshnessFromCustomSQL: + @pytest.fixture + def adapter(self): + # Create mock config and context + config = MagicMock() + + # Create test adapter class that implements abstract methods + class TestAdapter(BaseAdapter): + def convert_boolean_type(self, *args, **kwargs): + return None + + def convert_date_type(self, *args, **kwargs): + return None + + def convert_datetime_type(self, *args, **kwargs): + return None + + def convert_number_type(self, *args, **kwargs): + return None + + def convert_text_type(self, *args, **kwargs): + return None + + def convert_time_type(self, *args, **kwargs): + return None + + def create_schema(self, *args, **kwargs): + return None + + def date_function(self, *args, **kwargs): + return None + + def drop_relation(self, *args, **kwargs): + return None + + def drop_schema(self, *args, **kwargs): + return None + + def expand_column_types(self, *args, **kwargs): + return None + + def get_columns_in_relation(self, *args, **kwargs): + return None + + def is_cancelable(self, *args, **kwargs): + return False + + def list_relations_without_caching(self, *args, **kwargs): + return [] + + def list_schemas(self, *args, **kwargs): + return [] + + def quote(self, *args, **kwargs): + return "" + + def rename_relation(self, *args, **kwargs): + return None + + def truncate_relation(self, *args, **kwargs): + return None + + return TestAdapter(config, MagicMock()) + + @pytest.fixture + def mock_relation(self): + mock = MagicMock() + mock.__str__ = lambda x: "test.table" + return mock + + @patch("dbt.adapters.base.BaseAdapter.execute_macro") + def test_calculate_freshness_from_customsql_success( + self, mock_execute_macro, adapter, mock_relation + ): + """Test successful freshness calculation from custom SQL""" + + # Setup test data + current_time = datetime.now(pytz.UTC) + last_modified = datetime(2023, 1, 1, tzinfo=pytz.UTC) + + # Create mock agate table with test data + mock_table = agate.Table.from_object( + [{"last_modified": last_modified, "snapshotted_at": current_time}] + ) + + # Configure mock execute_macro + mock_execute_macro.return_value = MagicMock( + response=AdapterResponse("SUCCESS"), table=mock_table + ) + + # Execute method under test + adapter_response, freshness_response = adapter.calculate_freshness_from_custom_sql( + source=mock_relation, sql="SELECT max(updated_at) as last_modified" + ) + + # Verify execute_macro was called correctly + mock_execute_macro.assert_called_once_with( + "collect_freshness_custom_sql", + kwargs={ + "source": mock_relation, + "loaded_at_query": "SELECT max(updated_at) as last_modified", + }, + macro_resolver=None, + ) + + # Verify adapter response + assert adapter_response._message == "SUCCESS" + + # Verify freshness response + assert freshness_response["max_loaded_at"] == last_modified + assert freshness_response["snapshotted_at"] == current_time + assert isinstance(freshness_response["age"], float) + + @patch("dbt.adapters.base.BaseAdapter.execute_macro") + def test_calculate_freshness_from_customsql_null_last_modified( + self, mock_execute_macro, adapter, mock_relation + ): + """Test freshness calculation when last_modified is NULL""" + + current_time = datetime.now(pytz.UTC) + + # Create mock table with NULL last_modified + mock_table = agate.Table.from_object( + [{"last_modified": None, "snapshotted_at": current_time}] + ) + + mock_execute_macro.return_value = MagicMock( + response=AdapterResponse("SUCCESS"), table=mock_table + ) + + # Execute method + _, freshness_response = adapter.calculate_freshness_from_custom_sql( + source=mock_relation, sql="SELECT max(updated_at) as last_modified" + ) + + # Verify NULL last_modified is handled by using datetime.min + expected_min_date = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) + assert freshness_response["max_loaded_at"] == expected_min_date + assert freshness_response["snapshotted_at"] == current_time + assert isinstance(freshness_response["age"], float)