diff --git a/.changes/1.10.4.md b/.changes/1.10.4.md index f8bbd420..cb03a1d1 100644 --- a/.changes/1.10.4.md +++ b/.changes/1.10.4.md @@ -1 +1,12 @@ ## dbt-adapters 1.10.4 - November 11, 2024 + +### Features + +- Use a behavior flag to gate microbatch functionality (instead of an environment variable) ([#327](https://github.com/dbt-labs/dbt-adapters/issues/327)) + +### Under the Hood + +- Add `query_id` to SQLQueryStatus ([#342](https://github.com/dbt-labs/dbt-adapters/issues/342)) + +### Contributors +- [@cmcarthur](https://github.com/cmcarthur) ([#342](https://github.com/dbt-labs/dbt-adapters/issues/342)) diff --git a/.changes/1.11.0.md b/.changes/1.11.0.md index fbe85222..3f731699 100644 --- a/.changes/1.11.0.md +++ b/.changes/1.11.0.md @@ -1,12 +1,10 @@ -## dbt-adapters 1.11.0 - November 11, 2024 +## dbt-adapters 1.11.0 - December 17, 2024 ### Features -- Use a behavior flag to gate microbatch functionality (instead of an environment variable) ([#327](https://github.com/dbt-labs/dbt-adapters/issues/327)) +- Add new hard_deletes="new_record" mode for snapshots. ([#317](https://github.com/dbt-labs/dbt-adapters/issues/317)) +- Introduce new Capability for MicrobatchConcurrency support ([#359](https://github.com/dbt-labs/dbt-adapters/issues/359)) ### Under the Hood -- Add `query_id` to SQLQueryStatus ([#342](https://github.com/dbt-labs/dbt-adapters/issues/342)) - -### Contributors -- [@cmcarthur](https://github.com/cmcarthur) ([#342](https://github.com/dbt-labs/dbt-adapters/issues/342)) +- Add retry logic for retryable exceptions. ([#368](https://github.com/dbt-labs/dbt-adapters/issues/368)) diff --git a/.changes/1.12.0.md b/.changes/1.12.0.md new file mode 100644 index 00000000..843e7696 --- /dev/null +++ b/.changes/1.12.0.md @@ -0,0 +1 @@ +## dbt-adapters 1.12.0 - December 18, 2024 diff --git a/.changes/unreleased/Features-20241104-120653.yaml b/.changes/unreleased/Features-20241104-120653.yaml deleted file mode 100644 index a85e1f7f..00000000 --- a/.changes/unreleased/Features-20241104-120653.yaml +++ /dev/null @@ -1,6 +0,0 @@ -kind: Features -body: Add new hard_deletes="new_record" mode for snapshots. -time: 2024-11-04T12:06:53.225939-05:00 -custom: - Author: peterallenwebb - Issue: "317" diff --git a/.changes/unreleased/Features-20241120-112806.yaml b/.changes/unreleased/Features-20241120-112806.yaml deleted file mode 100644 index a135f946..00000000 --- a/.changes/unreleased/Features-20241120-112806.yaml +++ /dev/null @@ -1,6 +0,0 @@ -kind: Features -body: Introduce new Capability for MicrobatchConcurrency support -time: 2024-11-20T11:28:06.258507-05:00 -custom: - Author: michelleark - Issue: "359" diff --git a/.changes/unreleased/Features-20241216-172047.yaml b/.changes/unreleased/Features-20241216-172047.yaml new file mode 100644 index 00000000..232d184b --- /dev/null +++ b/.changes/unreleased/Features-20241216-172047.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add function to run custom sql for getting freshness info +time: 2024-12-16T17:20:47.065611-08:00 +custom: + Author: ChenyuLInx + Issue: "8797" diff --git a/.changes/unreleased/Under the Hood-20241217-110536.yaml b/.changes/unreleased/Under the Hood-20241217-110536.yaml new file mode 100644 index 00000000..5716da5e --- /dev/null +++ b/.changes/unreleased/Under the Hood-20241217-110536.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Added new equals macro that handles null value checks in sql +time: 2024-12-17T11:05:36.363421+02:00 +custom: + Author: adrianburusdbt + Issue: "159" diff --git a/.github/workflows/_generate-changelog.yml b/.github/workflows/_generate-changelog.yml index 37b67bbb..acefb3e0 100644 --- a/.github/workflows/_generate-changelog.yml +++ b/.github/workflows/_generate-changelog.yml @@ -198,6 +198,9 @@ jobs: if: ${{ needs.temp-branch.outputs.name != '' && inputs.merge }} runs-on: ${{ vars.DEFAULT_RUNNER }} steps: + - uses: actions/checkout@v4 + with: + ref: ${{ needs.temp-branch.outputs.name }} - uses: everlytic/branch-merge@1.1.5 with: source_ref: ${{ needs.temp-branch.outputs.name }} diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 04a14545..421a66ad 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -58,11 +58,45 @@ jobs: branch: ${{ needs.generate-changelog.outputs.branch-name }} secrets: inherit - publish-pypi: + package: if: ${{ inputs.pypi-public == true }} - needs: generate-changelog - uses: ./.github/workflows/_publish-pypi.yml + uses: ./.github/workflows/_package-directory.yml with: package: ${{ inputs.package }} - deploy-to: ${{ inputs.deploy-to }} - branch: ${{ needs.generate-changelog.outputs.branch-name }} + + publish-pypi: + if: ${{ inputs.pypi-public == true }} + needs: [package, generate-changelog] + runs-on: ${{ vars.DEFAULT_RUNNER }} + environment: + name: ${{ inputs.deploy-to }} + url: ${{ vars.PYPI_PROJECT_URL }}/${{ inputs.package }} + permissions: + # this permission is required for trusted publishing + # see https://github.com/marketplace/actions/pypi-publish + id-token: write + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ needs.generate-changelog.outputs.branch-name }} + - uses: actions/setup-python@v5 + with: + python-version: ${{ vars.DEFAULT_PYTHON_VERSION }} + - uses: pypa/hatch@install + # hatch will build using test PyPI first and fall back to prod PyPI when deploying to test + # this is done via environment variables in the test environment in GitHub + - run: hatch build && hatch run build:check-all + working-directory: ./${{ needs.package.outputs.directory }} + - uses: pypa/gh-action-pypi-publish@release/v1 + with: + repository-url: ${{ vars.PYPI_REPOSITORY_URL }} + packages-dir: ./${{ needs.package.outputs.directory }}dist/ + - id: version + run: echo "version=$(hatch version)" >> $GITHUB_OUTPUT + working-directory: ./${{ needs.package.outputs.directory }} + - uses: nick-fields/retry@v3 + with: + timeout_seconds: 10 + retry_wait_seconds: 10 + max_attempts: 15 # 5 minutes: (10s timeout + 10s delay) * 15 attempts + command: wget ${{ vars.PYPI_PROJECT_URL }}/${{ steps.version.outputs.version }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 87cca898..47a19fcb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,22 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html), and is generated by [Changie](https://github.com/miniscruff/changie). -## dbt-adapters 1.11.0 - November 11, 2024 +## dbt-adapters 1.12.0 - December 18, 2024 + + + +## dbt-adapters 1.11.0 - December 17, 2024 + +### Features + +- Add new hard_deletes="new_record" mode for snapshots. ([#317](https://github.com/dbt-labs/dbt-adapters/issues/317)) +- Introduce new Capability for MicrobatchConcurrency support ([#359](https://github.com/dbt-labs/dbt-adapters/issues/359)) + +### Under the Hood + +- Add retry logic for retryable exceptions. ([#368](https://github.com/dbt-labs/dbt-adapters/issues/368)) + +## dbt-adapters 1.10.4 - November 11, 2024 ### Features @@ -18,8 +33,6 @@ and is generated by [Changie](https://github.com/miniscruff/changie). ### Contributors - [@cmcarthur](https://github.com/cmcarthur) ([#342](https://github.com/dbt-labs/dbt-adapters/issues/342)) -## dbt-adapters 1.10.4 - November 11, 2024 - ## dbt-adapters 1.10.3 - October 29, 2024 ## dbt-adapters 1.10.2 - October 01, 2024 @@ -39,8 +52,6 @@ and is generated by [Changie](https://github.com/miniscruff/changie). - Negate the check for microbatch behavior flag in determining builtins ([#349](https://github.com/dbt-labs/dbt-adapters/issues/349)) - Move require_batched_execution_for_custom_microbatch_strategy flag to global ([#351](https://github.com/dbt-labs/dbt-adapters/issues/351)) - - ## dbt-adapters 1.8.0 - October 29, 2024 ### Fixes diff --git a/dbt-tests-adapter/dbt/tests/adapter/concurrency/test_concurrency.py b/dbt-tests-adapter/dbt/tests/adapter/concurrency/test_concurrency.py index b4eec93e..bcc87109 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/concurrency/test_concurrency.py +++ b/dbt-tests-adapter/dbt/tests/adapter/concurrency/test_concurrency.py @@ -1,11 +1,13 @@ +from collections import Counter + import pytest +from dbt.artifacts.schemas.results import RunStatus from dbt.tests.util import ( check_relations_equal, check_table_does_not_exist, rm_file, run_dbt, - run_dbt_and_capture, write_file, ) @@ -317,8 +319,8 @@ def test_concurrency(self, project): rm_file(project.project_root, "seeds", "seed.csv") write_file(seeds__update_csv, project.project_root, "seeds", "seed.csv") - results, output = run_dbt_and_capture(["run"], expect_pass=False) - assert len(results) == 7 + results = run_dbt(["run"], expect_pass=False) + check_relations_equal(project.adapter, ["seed", "view_model"]) check_relations_equal(project.adapter, ["seed", "dep"]) check_relations_equal(project.adapter, ["seed", "table_a"]) @@ -326,7 +328,13 @@ def test_concurrency(self, project): check_table_does_not_exist(project.adapter, "invalid") check_table_does_not_exist(project.adapter, "skip") - assert "PASS=5 WARN=0 ERROR=1 SKIP=1 TOTAL=7" in output + result_statuses = Counter([result.status for result in results]) + expected_statuses = { + RunStatus.Success: 5, + RunStatus.Error: 1, + RunStatus.Skipped: 1, + } + assert result_statuses == expected_statuses class TestConcurenncy(BaseConcurrency): diff --git a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_unique_id.py b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_unique_id.py index bddf407e..34807062 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_unique_id.py +++ b/dbt-tests-adapter/dbt/tests/adapter/incremental/test_incremental_unique_id.py @@ -240,6 +240,8 @@ select 'NY','New York','Manhattan','2021-04-01' union all select 'PA','Philadelphia','Philadelphia','2021-05-21' +union all +select 'CO','Denver',null,'2021-06-18' """ @@ -265,6 +267,8 @@ select 'NY','New York','Manhattan','2021-04-01' union all select 'PA','Philadelphia','Philadelphia','2021-05-21' +union all +select 'CO','Denver',null,'2021-06-18' """ @@ -288,6 +292,7 @@ NY,Kings,Brooklyn,2021-04-02 NY,New York,Manhattan,2021-04-01 PA,Philadelphia,Philadelphia,2021-05-21 +CO,Denver,,2021-06-18 """ seeds__add_new_rows_sql = """ @@ -439,7 +444,7 @@ def fail_to_build_inc_missing_unique_key_column(self, incremental_model_name): def test__no_unique_keys(self, project): """with no unique keys, seed and model should match""" - expected_fields = self.get_expected_fields(relation="seed", seed_rows=8) + expected_fields = self.get_expected_fields(relation="seed", seed_rows=9) test_case_fields = self.get_test_fields( project, seed="seed", incremental_model="no_unique_key", update_sql_file="add_new_rows" ) @@ -449,7 +454,7 @@ def test__no_unique_keys(self, project): def test__empty_str_unique_key(self, project): """with empty string for unique key, seed and model should match""" - expected_fields = self.get_expected_fields(relation="seed", seed_rows=8) + expected_fields = self.get_expected_fields(relation="seed", seed_rows=9) test_case_fields = self.get_test_fields( project, seed="seed", @@ -462,7 +467,7 @@ def test__one_unique_key(self, project): """with one unique key, model will overwrite existing row""" expected_fields = self.get_expected_fields( - relation="one_str__overwrite", seed_rows=7, opt_model_count=1 + relation="one_str__overwrite", seed_rows=8, opt_model_count=1 ) test_case_fields = self.get_test_fields( project, @@ -487,7 +492,7 @@ def test__bad_unique_key(self, project): def test__empty_unique_key_list(self, project): """with no unique keys, seed and model should match""" - expected_fields = self.get_expected_fields(relation="seed", seed_rows=8) + expected_fields = self.get_expected_fields(relation="seed", seed_rows=9) test_case_fields = self.get_test_fields( project, seed="seed", @@ -500,7 +505,7 @@ def test__unary_unique_key_list(self, project): """with one unique key, model will overwrite existing row""" expected_fields = self.get_expected_fields( - relation="unique_key_list__inplace_overwrite", seed_rows=7, opt_model_count=1 + relation="unique_key_list__inplace_overwrite", seed_rows=8, opt_model_count=1 ) test_case_fields = self.get_test_fields( project, @@ -515,7 +520,7 @@ def test__duplicated_unary_unique_key_list(self, project): """with two of the same unique key, model will overwrite existing row""" expected_fields = self.get_expected_fields( - relation="unique_key_list__inplace_overwrite", seed_rows=7, opt_model_count=1 + relation="unique_key_list__inplace_overwrite", seed_rows=8, opt_model_count=1 ) test_case_fields = self.get_test_fields( project, @@ -530,7 +535,7 @@ def test__trinary_unique_key_list(self, project): """with three unique keys, model will overwrite existing row""" expected_fields = self.get_expected_fields( - relation="unique_key_list__inplace_overwrite", seed_rows=7, opt_model_count=1 + relation="unique_key_list__inplace_overwrite", seed_rows=8, opt_model_count=1 ) test_case_fields = self.get_test_fields( project, @@ -545,7 +550,7 @@ def test__trinary_unique_key_list_no_update(self, project): """even with three unique keys, adding distinct rows to seed does not cause seed and model to diverge""" - expected_fields = self.get_expected_fields(relation="seed", seed_rows=8) + expected_fields = self.get_expected_fields(relation="seed", seed_rows=9) test_case_fields = self.get_test_fields( project, seed="seed", diff --git a/dbt-tests-adapter/dbt/tests/adapter/utils/base_utils.py b/dbt-tests-adapter/dbt/tests/adapter/utils/base_utils.py index 23e1ca7f..943b2aa8 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/utils/base_utils.py +++ b/dbt-tests-adapter/dbt/tests/adapter/utils/base_utils.py @@ -1,16 +1,6 @@ import pytest from dbt.tests.util import run_dbt - -macros__equals_sql = """ -{% macro equals(expr1, expr2) -%} -case when (({{ expr1 }} = {{ expr2 }}) or ({{ expr1 }} is null and {{ expr2 }} is null)) - then 0 - else 1 -end = 0 -{% endmacro %} -""" - macros__test_assert_equal_sql = """ {% test assert_equal(model, actual, expected) %} select * from {{ model }} @@ -33,7 +23,6 @@ class BaseUtils: @pytest.fixture(scope="class") def macros(self): return { - "equals.sql": macros__equals_sql, "test_assert_equal.sql": macros__test_assert_equal_sql, "replace_empty.sql": macros__replace_empty_sql, } diff --git a/dbt-tests-adapter/dbt/tests/adapter/utils/test_equals.py b/dbt-tests-adapter/dbt/tests/adapter/utils/test_equals.py index c61f6fdf..d8596dc0 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/utils/test_equals.py +++ b/dbt-tests-adapter/dbt/tests/adapter/utils/test_equals.py @@ -1,16 +1,10 @@ import pytest -from dbt.tests.adapter.utils import base_utils, fixture_equals +from dbt.tests.adapter.utils import fixture_equals from dbt.tests.util import relation_from_name, run_dbt class BaseEquals: - @pytest.fixture(scope="class") - def macros(self): - return { - "equals.sql": base_utils.macros__equals_sql, - } - @pytest.fixture(scope="class") def seeds(self): return { diff --git a/dbt-tests-adapter/dbt/tests/adapter/utils/test_source_freshness_custom_info.py b/dbt-tests-adapter/dbt/tests/adapter/utils/test_source_freshness_custom_info.py new file mode 100644 index 00000000..b4f15dab --- /dev/null +++ b/dbt-tests-adapter/dbt/tests/adapter/utils/test_source_freshness_custom_info.py @@ -0,0 +1,70 @@ +from typing import Type +from unittest.mock import MagicMock + +from dbt_common.exceptions import DbtRuntimeError +import pytest + +from dbt.adapters.base.impl import BaseAdapter + + +class BaseCalculateFreshnessMethod: + """Tests the behavior of the calculate_freshness_from_customsql method for the relevant adapters. + + The base method is meant to throw the appropriate custom exception when calculate_freshness_from_customsql + fails. + """ + + @pytest.fixture(scope="class") + def valid_sql(self) -> str: + """Returns a valid statement for issuing as a validate_sql query. + + Ideally this would be checkable for non-execution. For example, we could use a + CREATE TABLE statement with an assertion that no table was created. However, + for most adapter types this is unnecessary - the EXPLAIN keyword has exactly the + behavior we want, and here we are essentially testing to make sure it is + supported. As such, we return a simple SELECT query, and leave it to + engine-specific test overrides to specify more detailed behavior as appropriate. + """ + + return "select now()" + + @pytest.fixture(scope="class") + def invalid_sql(self) -> str: + """Returns an invalid statement for issuing a bad validate_sql query.""" + + return "Let's run some invalid SQL and see if we get an error!" + + @pytest.fixture(scope="class") + def expected_exception(self) -> Type[Exception]: + """Returns the Exception type thrown by a failed query. + + Defaults to dbt_common.exceptions.DbtRuntimeError because that is the most common + base exception for adapters to throw.""" + return DbtRuntimeError + + @pytest.fixture(scope="class") + def mock_relation(self): + mock = MagicMock() + mock.__str__ = lambda x: "test.table" + return mock + + def test_calculate_freshness_from_custom_sql_success( + self, adapter: BaseAdapter, valid_sql: str, mock_relation + ) -> None: + with adapter.connection_named("test_freshness_custom_sql"): + adapter.calculate_freshness_from_custom_sql(mock_relation, valid_sql) + + def test_calculate_freshness_from_custom_sql_failure( + self, + adapter: BaseAdapter, + invalid_sql: str, + expected_exception: Type[Exception], + mock_relation, + ) -> None: + with pytest.raises(expected_exception=expected_exception): + with adapter.connection_named("test_infreshness_custom_sql"): + adapter.calculate_freshness_from_custom_sql(mock_relation, invalid_sql) + + +class TestCalculateFreshnessMethod(BaseCalculateFreshnessMethod): + pass diff --git a/dbt-tests-adapter/pyproject.toml b/dbt-tests-adapter/pyproject.toml index d2f732b7..c220e0e3 100644 --- a/dbt-tests-adapter/pyproject.toml +++ b/dbt-tests-adapter/pyproject.toml @@ -52,6 +52,16 @@ include = ["dbt/tests", "dbt/__init__.py"] [tool.hatch.build.targets.wheel] include = ["dbt/tests", "dbt/__init__.py"] +[tool.hatch.envs.default] +python = "3.9" +dependencies = [ + "dbt_common @ git+https://github.com/dbt-labs/dbt-common.git", + "dbt-adapters @ {root:uri}/..", + "dbt-core @ git+https://github.com/dbt-labs/dbt-core.git#subdirectory=core", + "pre-commit==3.7.0", + "pytest" +] + [tool.hatch.envs.build] detached = true dependencies = [ diff --git a/dbt/adapters/__about__.py b/dbt/adapters/__about__.py index 08e0d06b..134ed009 100644 --- a/dbt/adapters/__about__.py +++ b/dbt/adapters/__about__.py @@ -1 +1 @@ -version = "1.10.4" +version = "1.12.0" diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index ae172635..8474b39d 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -97,6 +97,7 @@ GET_CATALOG_MACRO_NAME = "get_catalog" GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations" FRESHNESS_MACRO_NAME = "collect_freshness" +CUSTOM_SQL_FRESHNESS_MACRO_NAME = "collect_freshness_custom_sql" GET_RELATION_LAST_MODIFIED_MACRO_NAME = "get_relation_last_modified" DEFAULT_BASE_BEHAVIOR_FLAGS = [ { @@ -1327,6 +1328,31 @@ def cancel_open_connections(self): """Cancel all open connections.""" return self.connections.cancel_open() + def _process_freshness_execution( + self, + macro_name: str, + kwargs: Dict[str, Any], + macro_resolver: Optional[MacroResolverProtocol] = None, + ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: + """Execute and process a freshness macro to generate a FreshnessResponse""" + import agate + + result = self.execute_macro(macro_name, kwargs=kwargs, macro_resolver=macro_resolver) + + if isinstance(result, agate.Table): + warn_or_error(CollectFreshnessReturnSignature()) + table = result + adapter_response = None + else: + adapter_response, table = result.response, result.table + + # Process the results table + if len(table) != 1 or len(table[0]) != 2: + raise MacroResultError(macro_name, table) + + freshness_response = self._create_freshness_response(table[0][0], table[0][1]) + return adapter_response, freshness_response + def calculate_freshness( self, source: BaseRelation, @@ -1335,49 +1361,26 @@ def calculate_freshness( macro_resolver: Optional[MacroResolverProtocol] = None, ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: """Calculate the freshness of sources in dbt, and return it""" - import agate - - kwargs: Dict[str, Any] = { + kwargs = { "source": source, "loaded_at_field": loaded_at_field, "filter": filter, } + return self._process_freshness_execution(FRESHNESS_MACRO_NAME, kwargs, macro_resolver) - # run the macro - # in older versions of dbt-core, the 'collect_freshness' macro returned the table of results directly - # starting in v1.5, by default, we return both the table and the adapter response (metadata about the query) - result: Union[ - AttrDict, # current: contains AdapterResponse + "agate.Table" - "agate.Table", # previous: just table - ] - result = self.execute_macro( - FRESHNESS_MACRO_NAME, kwargs=kwargs, macro_resolver=macro_resolver - ) - if isinstance(result, agate.Table): - warn_or_error(CollectFreshnessReturnSignature()) - adapter_response = None - table = result - else: - adapter_response, table = result.response, result.table # type: ignore[attr-defined] - # now we have a 1-row table of the maximum `loaded_at_field` value and - # the current time according to the db. - if len(table) != 1 or len(table[0]) != 2: - raise MacroResultError(FRESHNESS_MACRO_NAME, table) - if table[0][0] is None: - # no records in the table, so really the max_loaded_at was - # infinitely long ago. Just call it 0:00 January 1 year UTC - max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) - else: - max_loaded_at = _utc(table[0][0], source, loaded_at_field) - - snapshotted_at = _utc(table[0][1], source, loaded_at_field) - age = (snapshotted_at - max_loaded_at).total_seconds() - freshness: FreshnessResponse = { - "max_loaded_at": max_loaded_at, - "snapshotted_at": snapshotted_at, - "age": age, + def calculate_freshness_from_custom_sql( + self, + source: BaseRelation, + sql: str, + macro_resolver: Optional[MacroResolverProtocol] = None, + ) -> Tuple[Optional[AdapterResponse], FreshnessResponse]: + kwargs = { + "source": source, + "loaded_at_query": sql, } - return adapter_response, freshness + return self._process_freshness_execution( + CUSTOM_SQL_FRESHNESS_MACRO_NAME, kwargs, macro_resolver + ) def calculate_freshness_from_metadata_batch( self, diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index baccddc9..04b5e401 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -1,6 +1,16 @@ import abc import time -from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING +from typing import ( + Any, + Dict, + Iterable, + Iterator, + List, + Optional, + Tuple, + TYPE_CHECKING, + Type, +) from dbt_common.events.contextvars import get_node_info from dbt_common.events.functions import fire_event @@ -18,6 +28,7 @@ SQLCommit, SQLQuery, SQLQueryStatus, + AdapterEventDebug, ) if TYPE_CHECKING: @@ -61,7 +72,50 @@ def add_query( auto_begin: bool = True, bindings: Optional[Any] = None, abridge_sql_log: bool = False, + retryable_exceptions: Tuple[Type[Exception], ...] = tuple(), + retry_limit: int = 1, ) -> Tuple[Connection, Any]: + """ + Retry function encapsulated here to avoid commitment to some + user-facing interface. Right now, Redshift commits to a 1 second + retry timeout so this serves as a default. + """ + + def _execute_query_with_retry( + cursor: Any, + sql: str, + bindings: Optional[Any], + retryable_exceptions: Tuple[Type[Exception], ...], + retry_limit: int, + attempt: int, + ): + """ + A success sees the try exit cleanly and avoid any recursive + retries. Failure begins a sleep and retry routine. + """ + try: + cursor.execute(sql, bindings) + except retryable_exceptions as e: + # Cease retries and fail when limit is hit. + if attempt >= retry_limit: + raise e + + fire_event( + AdapterEventDebug( + message=f"Got a retryable error {type(e)}. {retry_limit-attempt} retries left. Retrying in 1 second.\nError:\n{e}" + ) + ) + time.sleep(1) + + return _execute_query_with_retry( + cursor=cursor, + sql=sql, + bindings=bindings, + retryable_exceptions=retryable_exceptions, + retry_limit=retry_limit, + attempt=attempt + 1, + ) + connection = self.get_thread_connection() if auto_begin and connection.transaction_open is False: self.begin() @@ -90,7 +144,14 @@ def add_query( pre = time.perf_counter() cursor = connection.handle.cursor() - cursor.execute(sql, bindings) + _execute_query_with_retry( + cursor=cursor, + sql=sql, + bindings=bindings, + retryable_exceptions=retryable_exceptions, + retry_limit=retry_limit, + attempt=1, + ) result = self.get_response(cursor) diff --git a/dbt/include/global_project/macros/adapters/freshness.sql b/dbt/include/global_project/macros/adapters/freshness.sql index f18499a2..1af6165c 100644 --- a/dbt/include/global_project/macros/adapters/freshness.sql +++ b/dbt/include/global_project/macros/adapters/freshness.sql @@ -14,3 +14,19 @@ {% endcall %} {{ return(load_result('collect_freshness')) }} {% endmacro %} + +{% macro collect_freshness_custom_sql(source, loaded_at_query) %} + {{ return(adapter.dispatch('collect_freshness_custom_sql', 'dbt')(source, loaded_at_query))}} +{% endmacro %} + +{% macro default__collect_freshness_custom_sql(source, loaded_at_query) %} + {% call statement('collect_freshness_custom_sql', fetch_result=True, auto_begin=False) -%} + with source_query as ( + {{ loaded_at_query }} + ) + select + (select * from source_query) as max_loaded_at, + {{ current_timestamp() }} as snapshotted_at + {% endcall %} + {{ return(load_result('collect_freshness_custom_sql')) }} +{% endmacro %} diff --git a/dbt/include/global_project/macros/materializations/models/incremental/merge.sql b/dbt/include/global_project/macros/materializations/models/incremental/merge.sql index ca972c9f..d7e8af70 100644 --- a/dbt/include/global_project/macros/materializations/models/incremental/merge.sql +++ b/dbt/include/global_project/macros/materializations/models/incremental/merge.sql @@ -21,8 +21,14 @@ {% do predicates.append(this_key_match) %} {% endfor %} {% else %} + {% set source_unique_key %} + DBT_INTERNAL_SOURCE.{{ unique_key }} + {% endset %} + {% set target_unique_key %} + DBT_INTERNAL_DEST.{{ unique_key }} + {% endset %} {% set unique_key_match %} - DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {{ equals(source_unique_key, target_unique_key) }} {% endset %} {% do predicates.append(unique_key_match) %} {% endif %} @@ -62,11 +68,18 @@ {% if unique_key %} {% if unique_key is sequence and unique_key is not string %} - delete from {{target }} + delete from {{ target }} using {{ source }} where ( {% for key in unique_key %} - {{ source }}.{{ key }} = {{ target }}.{{ key }} + {% set source_unique_key %} + {{ source }}.{{ key }} + {% endset %} + {% set target_unique_key %} + {{ target }}.{{ key }} + {% endset %} + + {{ equals(source_unique_key, target_unique_key) }} {{ "and " if not loop.last}} {% endfor %} {% if incremental_predicates %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 33492cc9..905ab136 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -53,8 +53,14 @@ from {{ target_relation }} where {% if config.get('dbt_valid_to_current') %} - {# Check for either dbt_valid_to_current OR null, in order to correctly update records with nulls #} - ( {{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or {{ columns.dbt_valid_to }} is null) + {% set source_unique_key %} + columns.dbt_valid_to + {% endset %} + {% set target_unique_key %} + config.get('dbt_valid_to_current') + {% endset %} + + {{ equals(source_unique_key, target_unique_key) }} {% else %} {{ columns.dbt_valid_to }} is null {% endif %} @@ -276,7 +282,14 @@ {% macro unique_key_join_on(unique_key, identifier, from_identifier) %} {% if unique_key | is_list %} {% for key in unique_key %} - {{ identifier }}.dbt_unique_key_{{ loop.index }} = {{ from_identifier }}.dbt_unique_key_{{ loop.index }} + {% set source_unique_key %} + {{ identifier }}.dbt_unique_key_{{ loop.index }} + {% endset %} + {% set target_unique_key %} + {{ from_identifier }}.dbt_unique_key_{{ loop.index }} + {% endset %} + + {{ equals(source_unique_key, target_unique_key) }} {%- if not loop.last %} and {%- endif %} {% endfor %} {% else %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql b/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql index cf787e4f..19a67f6b 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql @@ -15,8 +15,14 @@ when matched {% if config.get("dbt_valid_to_current") %} - and (DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or - DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null) + {% set source_unique_key %} + DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} + {% endset %} + {% set target_unique_key %} + {{ config.get('dbt_valid_to_current') }} + {% endset %} + and {{ equals(source_unique_key, target_unique_key) }} + {% else %} and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null {% endif %} diff --git a/dbt/include/global_project/macros/utils/equals.sql b/dbt/include/global_project/macros/utils/equals.sql new file mode 100644 index 00000000..d63b6cc1 --- /dev/null +++ b/dbt/include/global_project/macros/utils/equals.sql @@ -0,0 +1,12 @@ +{% macro equals(expr1, expr2) %} + {{ return(adapter.dispatch('equals', 'dbt') (expr1, expr2)) }} +{%- endmacro %} + +{% macro default__equals(expr1, expr2) -%} + + case when (({{ expr1 }} = {{ expr2 }}) or ({{ expr1 }} is null and {{ expr2 }} is null)) + then 0 + else 1 + end = 0 + +{% endmacro %} diff --git a/pyproject.toml b/pyproject.toml index 0c03b341..f8b6335f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ include = ["dbt/adapters", "dbt/include", "dbt/__init__.py"] include = ["dbt/adapters", "dbt/include", "dbt/__init__.py"] [tool.hatch.envs.default] +python = "3.9" dependencies = [ "dbt_common @ git+https://github.com/dbt-labs/dbt-common.git", 'pre-commit==3.7.0;python_version>="3.9"', diff --git a/tests/unit/test_base_adapter.py b/tests/unit/test_base_adapter.py index 5fa109b7..3d763710 100644 --- a/tests/unit/test_base_adapter.py +++ b/tests/unit/test_base_adapter.py @@ -4,6 +4,12 @@ from dbt.adapters.base.impl import BaseAdapter, ConstraintSupport +from datetime import datetime +from unittest.mock import MagicMock, patch +import agate +import pytz +from dbt.adapters.contracts.connection import AdapterResponse + class TestBaseAdapterConstraintRendering: @pytest.fixture(scope="class") @@ -234,3 +240,145 @@ def test_render_raw_model_constraints_unsupported( rendered_constraints = BaseAdapter.render_raw_model_constraints(constraints) assert rendered_constraints == [] + + +class TestCalculateFreshnessFromCustomSQL: + @pytest.fixture + def adapter(self): + # Create mock config and context + config = MagicMock() + + # Create test adapter class that implements abstract methods + class TestAdapter(BaseAdapter): + def convert_boolean_type(self, *args, **kwargs): + return None + + def convert_date_type(self, *args, **kwargs): + return None + + def convert_datetime_type(self, *args, **kwargs): + return None + + def convert_number_type(self, *args, **kwargs): + return None + + def convert_text_type(self, *args, **kwargs): + return None + + def convert_time_type(self, *args, **kwargs): + return None + + def create_schema(self, *args, **kwargs): + return None + + def date_function(self, *args, **kwargs): + return None + + def drop_relation(self, *args, **kwargs): + return None + + def drop_schema(self, *args, **kwargs): + return None + + def expand_column_types(self, *args, **kwargs): + return None + + def get_columns_in_relation(self, *args, **kwargs): + return None + + def is_cancelable(self, *args, **kwargs): + return False + + def list_relations_without_caching(self, *args, **kwargs): + return [] + + def list_schemas(self, *args, **kwargs): + return [] + + def quote(self, *args, **kwargs): + return "" + + def rename_relation(self, *args, **kwargs): + return None + + def truncate_relation(self, *args, **kwargs): + return None + + return TestAdapter(config, MagicMock()) + + @pytest.fixture + def mock_relation(self): + mock = MagicMock() + mock.__str__ = lambda x: "test.table" + return mock + + @patch("dbt.adapters.base.BaseAdapter.execute_macro") + def test_calculate_freshness_from_customsql_success( + self, mock_execute_macro, adapter, mock_relation + ): + """Test successful freshness calculation from custom SQL""" + + # Setup test data + current_time = datetime.now(pytz.UTC) + last_modified = datetime(2023, 1, 1, tzinfo=pytz.UTC) + + # Create mock agate table with test data + mock_table = agate.Table.from_object( + [{"last_modified": last_modified, "snapshotted_at": current_time}] + ) + + # Configure mock execute_macro + mock_execute_macro.return_value = MagicMock( + response=AdapterResponse("SUCCESS"), table=mock_table + ) + + # Execute method under test + adapter_response, freshness_response = adapter.calculate_freshness_from_custom_sql( + source=mock_relation, sql="SELECT max(updated_at) as last_modified" + ) + + # Verify execute_macro was called correctly + mock_execute_macro.assert_called_once_with( + "collect_freshness_custom_sql", + kwargs={ + "source": mock_relation, + "loaded_at_query": "SELECT max(updated_at) as last_modified", + }, + macro_resolver=None, + ) + + # Verify adapter response + assert adapter_response._message == "SUCCESS" + + # Verify freshness response + assert freshness_response["max_loaded_at"] == last_modified + assert freshness_response["snapshotted_at"] == current_time + assert isinstance(freshness_response["age"], float) + + @patch("dbt.adapters.base.BaseAdapter.execute_macro") + def test_calculate_freshness_from_customsql_null_last_modified( + self, mock_execute_macro, adapter, mock_relation + ): + """Test freshness calculation when last_modified is NULL""" + + current_time = datetime.now(pytz.UTC) + + # Create mock table with NULL last_modified + mock_table = agate.Table.from_object( + [{"last_modified": None, "snapshotted_at": current_time}] + ) + + mock_execute_macro.return_value = MagicMock( + response=AdapterResponse("SUCCESS"), table=mock_table + ) + + # Execute method + _, freshness_response = adapter.calculate_freshness_from_custom_sql( + source=mock_relation, sql="SELECT max(updated_at) as last_modified" + ) + + # Verify NULL last_modified is handled by using datetime.min + expected_min_date = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC) + assert freshness_response["max_loaded_at"] == expected_min_date + assert freshness_response["snapshotted_at"] == current_time + assert isinstance(freshness_response["age"], float)