diff --git a/CHANGELOG.md b/CHANGELOG.md index ef55bb281..75caf04bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ ### Features - Support Python 3.11 ([#233](https://github.com/databricks/dbt-databricks/pull/233)) +- Support `incremental_predicates` ([#161](https://github.com/databricks/dbt-databricks/pull/161)) ## dbt-databricks 1.3.3 (Release TBD) diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index ba6e4cfd5..d035f5654 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -9,6 +9,7 @@ {#-- Set vars --#} + {%- set incremental_predicates = config.get('predicates', default=none) or config.get('incremental_predicates', default=none) -%} {%- set unique_key = config.get('unique_key', none) -%} {%- set partition_by = config.get('partition_by', none) -%} {%- set language = model['language'] -%} @@ -54,7 +55,7 @@ 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': none, - 'predicates': none}) -%} + 'incremental_predicates': incremental_predicates}) -%} {%- set build_sql = strategy_sql_macro_func(strategy_arg_dict) -%} {%- if language == 'sql' -%} {%- call statement('main') -%} diff --git a/tests/functional/adapter/test_incremental_predicates.py b/tests/functional/adapter/test_incremental_predicates.py new file mode 100644 index 000000000..6dbda21ce --- /dev/null +++ b/tests/functional/adapter/test_incremental_predicates.py @@ -0,0 +1,55 @@ +import pytest +from dbt.tests.adapter.incremental.test_incremental_predicates import BaseIncrementalPredicates + + +models__databricks_incremental_predicates_sql = """ +{{ config( + materialized = 'incremental', + unique_key = 'id' +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +-- merge will not happen on the above record where id = 2, so new record will fall to insert +select cast(1 as bigint) as id, 'hey' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'yo' as msg, 'green' as color +union all +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} +""" + + +class TestIncrementalPredicatesMergeDatabricks(BaseIncrementalPredicates): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"+incremental_predicates": ["dbt_internal_dest.id != 2"]}} + + @pytest.fixture(scope="class") + def models(self): + return { + "delete_insert_incremental_predicates.sql": ( + models__databricks_incremental_predicates_sql + ) + } + + +class TestPredicatesMergeDatabricks(BaseIncrementalPredicates): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"+predicates": ["dbt_internal_dest.id != 2"]}} + + @pytest.fixture(scope="class") + def models(self): + return { + "delete_insert_incremental_predicates.sql": ( + models__databricks_incremental_predicates_sql + ) + }