diff --git a/.changes/unreleased/Features-20241002-171112.yaml b/.changes/unreleased/Features-20241002-171112.yaml new file mode 100644 index 000000000..3caaaec10 --- /dev/null +++ b/.changes/unreleased/Features-20241002-171112.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add microbatch strategy +time: 2024-10-02T17:11:12.88725-05:00 +custom: + Author: QMalcolm + Issue: "923" diff --git a/dbt/adapters/redshift/impl.py b/dbt/adapters/redshift/impl.py index e69c8c448..aaf3d46ca 100644 --- a/dbt/adapters/redshift/impl.py +++ b/dbt/adapters/redshift/impl.py @@ -130,7 +130,7 @@ def valid_incremental_strategies(self): """The set of standard builtin strategies which this adapter supports out-of-the-box. Not used to validate custom strategies defined by end users. """ - return ["append", "delete+insert", "merge"] + return ["append", "delete+insert", "merge", "microbatch"] def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str: return f"{add_to} + interval '{number} {interval}'" diff --git a/dbt/include/redshift/macros/materializations/incremental_merge.sql b/dbt/include/redshift/macros/materializations/incremental_merge.sql index 59a3391e3..8ddb8f96c 100644 --- a/dbt/include/redshift/macros/materializations/incremental_merge.sql +++ b/dbt/include/redshift/macros/materializations/incremental_merge.sql @@ -65,3 +65,50 @@ ) {% endmacro %} + +{% macro redshift__get_incremental_microbatch_sql(arg_dict) %} + {#- + Technically this function could just call out to the default implementation of delete_insert. + However, the default implementation requires a unique_id, which we actually do not want or + need. Thus we re-implement delete insert here without the unique_id requirement + -#} + + {%- set target = arg_dict["target_relation"] -%} + {%- set source = arg_dict["temp_relation"] -%} + {%- set dest_columns = arg_dict["dest_columns"] -%} + {%- set predicates = [] -%} + + {%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%} + {%- for pred in incremental_predicates -%} + {% if "DBT_INTERNAL_DEST." in pred %} + {%- set pred = pred | replace("DBT_INTERNAL_DEST.", target ~ "." ) -%} + {% endif %} + {% if "dbt_internal_dest." in pred %} + {%- set pred = pred | replace("dbt_internal_dest.", target ~ "." ) -%} + {% endif %} + {% do predicates.append(pred) %} + {% endfor %} + + {% if not model.config.get("__dbt_internal_microbatch_event_time_start") or not model.config.get("__dbt_internal_microbatch_event_time_end") -%} + {% do exceptions.raise_compiler_error('dbt could not compute the start and end timestamps for the running batch') %} + {% endif %} + + {#-- Add additional incremental_predicates to filter for batch --#} + {% do predicates.append(model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %} + {% do predicates.append(model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %} + {% do arg_dict.update({'incremental_predicates': predicates}) %} + + delete from {{ target }} + where ( + {% for predicate in predicates %} + {%- if not loop.first %}and {% endif -%} {{ predicate }} + {% endfor %} + ); + + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) +{% endmacro %} diff --git a/tests/functional/adapter/incremental/test_incremental_microbatch.py b/tests/functional/adapter/incremental/test_incremental_microbatch.py new file mode 100644 index 000000000..1bd196bf8 --- /dev/null +++ b/tests/functional/adapter/incremental/test_incremental_microbatch.py @@ -0,0 +1,24 @@ +import pytest +from dbt.tests.adapter.incremental.test_incremental_microbatch import ( + BaseMicrobatch, +) + + +# No requirement for a unique_id for redshift microbatch! +_microbatch_model_no_unique_id_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} +select * from {{ ref('input_model') }} +""" + + +class TestSnowflakeMicrobatch(BaseMicrobatch): + @pytest.fixture(scope="class") + def microbatch_model_sql(self) -> str: + return _microbatch_model_no_unique_id_sql + + @pytest.fixture(scope="class") + def insert_two_rows_sql(self, project) -> str: + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00-0'), (5, '2020-01-05 00:00:00-0')"