From 6028f2e6e1be758fb1b75d747bd2bcedc516be29 Mon Sep 17 00:00:00 2001 From: "V. Ganesh" Date: Sat, 26 Mar 2022 02:19:21 +0530 Subject: [PATCH] incremental support - strategy: append, insert_overwrite; onschema_change: fail, ignore (#1) not supported strategy: merge, delete_overwrite; onschema_change: append_new_columns, sync_new_columns not supported: unique_key Ticket (internal JIRA): https://jira.cloudera.com/browse/DBT-38 https://jira.cloudera.com/browse/DBT-39 https://jira.cloudera.com/browse/DBT-48 https://jira.cloudera.com/browse/DBT-49 Testplan: 1. Basic dependencies need to be installed (dbt-core). 2. Build and install the dbt-impala adapter using: python3 setup.py install 3. Create a template dbt project using following: dbt init 4. Edit $HOME/.dbt/profiles.yml so that it looks similar to: demo_dbt: outputs: dev_impala: type: impala host: localhost port: 21050 dbname: s3test schema: s3test target: dev_impala 5. In the dbt project generated in step (2), run the following, which should succeed if local instance of Impala is up: dbt debug (check connection) 6. Create an incremental model with entry similar to, name it say, incremental_model.sql: {{ config( materialized='incremental', unique_key='id', incremental_strategy='insert_overwrite', ) }} select * from {{ ref('seed_sample') }} {% if is_incremental() %} where updated_at > (select max(updated_at) from {{ this }}) {% endif %} 7. Run this model using: dbt run [--full-refresh] --select incremental_model This should produce output similar to: 18:01:35 1 of 1 OK created incremental model s3test.my_third_dbt_model................... [OK in 63.16s] 18:01:35 18:01:35 Finished running 1 incremental model in 63.24s. 18:01:35 18:01:35 Completed successfully 18:01:35 18:01:35 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1 Co-authored-by: V. Ganesh --- dbt/include/impala/macros/adapters.sql | 2 +- dbt/include/impala/macros/incremental.sql | 151 ++++++++++++++++++++++ 2 files changed, 152 insertions(+), 1 deletion(-) create mode 100644 dbt/include/impala/macros/incremental.sql diff --git a/dbt/include/impala/macros/adapters.sql b/dbt/include/impala/macros/adapters.sql index e664067..db831ed 100644 --- a/dbt/include/impala/macros/adapters.sql +++ b/dbt/include/impala/macros/adapters.sql @@ -184,7 +184,7 @@ {% macro impala__drop_schema(relation) -%} {%- call statement('drop_schema') -%} - drop schema if exists {{ relation }} + drop schema if exists {{ relation }} cascade {%- endcall -%} {% endmacro %} diff --git a/dbt/include/impala/macros/incremental.sql b/dbt/include/impala/macros/incremental.sql new file mode 100644 index 0000000..8c56573 --- /dev/null +++ b/dbt/include/impala/macros/incremental.sql @@ -0,0 +1,151 @@ +{# +# Copyright 2022 Cloudera Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#} + +{% macro validate_get_incremental_strategy(raw_strategy) %} + {% set invalid_strategy_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + Expected one of: 'append', 'insert_overwrite' + {%- endset %} + + {% if raw_strategy not in ['append', 'insert_overwrite'] %} + {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} + {% endif %} + + {% do return(raw_strategy) %} +{% endmacro %} + +{% macro incremental_validate_on_schema_change(on_schema_change, default='ignore') %} + {% if on_schema_change not in ['fail', 'ignore'] %} + {% set log_message = 'Invalid value for on_schema_change (%s) specified. Setting default value of %s.' % (on_schema_change, default) %} + {% do log(log_message) %} + + {% do exceptions.raise_compiler_error(log_message) %} + + {{ return(default) }} + {% else %} + {{ return(on_schema_change) }} + {% endif %} +{% endmacro %} + +{% materialization incremental, adapter='impala' -%} + + {% set unique_key = config.get('unique_key') %} + {% set overwrite_msg -%} + impala adapter does not support 'unique_key' + {%- endset %} + {% if unique_key is not none %} + {% do exceptions.raise_compiler_error(overwrite_msg) %} + {% endif %} + + {% set raw_strategy = config.get('incremental_strategy', default='append') %} + {% set strategy = validate_get_incremental_strategy(raw_strategy) %} + + {% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} + + {%- set time_stamp = modules.datetime.datetime.now().isoformat().replace("-","").replace(":","").replace(".","") -%} + + {% set target_relation = this.incorporate(type='table') %} + {% set existing_relation = load_relation(this) %} + {% set tmp_relation = make_temp_relation(target_relation, '__' + time_stamp + '__dbt_tmp') %} + {%- set full_refresh_mode = (should_full_refresh()) -%} + + {% set tmp_identifier = model['name'] + '__' + time_stamp + '__dbt_tmp' %} + {% set backup_identifier = model['name'] + '__' + time_stamp + "__dbt_backup" %} + + -- the intermediate_ and backup_ relations should not already exist in the database; get_relation + -- will return None in that case. Otherwise, we get a relation that we can drop + -- later, before we try to use this name for the current operation. This has to happen before + -- BEGIN, in a separate transaction + {% set preexisting_intermediate_relation = adapter.get_relation(identifier=tmp_identifier, + schema=schema, + database=database) %} + {% set preexisting_backup_relation = adapter.get_relation(identifier=backup_identifier, + schema=schema, + database=database) %} + {{ drop_relation_if_exists(preexisting_intermediate_relation) }} + {{ drop_relation_if_exists(preexisting_backup_relation) }} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% set to_drop = [] %} + + {% do to_drop.append(tmp_relation) %} + + {# -- first check whether we want to full refresh for source view or config reasons #} + {% set trigger_full_refresh = (full_refresh_mode or existing_relation.is_view) %} + + {% if existing_relation is none %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% elif trigger_full_refresh %} + {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} + {% set tmp_identifier = model['name'] + '__' + time_stamp + '__dbt_tmp' %} + {% set backup_identifier = model['name'] + '__' + time_stamp + '__dbt_backup' %} + {% set intermediate_relation = existing_relation.incorporate(path={"identifier": tmp_identifier}) %} + {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} + + {% set build_sql = create_table_as(False, intermediate_relation, sql) %} + {% set need_swap = true %} + {% do to_drop.append(backup_relation) %} + {% do to_drop.append(intermediate_relation) %} + {% else %} + {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {% do adapter.expand_target_column_types( + from_relation=tmp_relation, + to_relation=target_relation) %} + {#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#} + {% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} + {% if not dest_columns %} + {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} + {% endif %} + + {#-- since unique key is not supported, the follow macro (default impl), will only return insert stm, and hence is directly used here --#} + {% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %} + + {% endif %} + + {% call statement("main") %} + {{ build_sql }} + {% endcall %} + + {% if need_swap %} + {% do adapter.rename_relation(target_relation, backup_relation) %} + {% do adapter.rename_relation(intermediate_relation, target_relation) %} + {% endif %} + + {% do persist_docs(target_relation, model) %} + + {% if existing_relation is none or existing_relation.is_view or should_full_refresh() %} + {% do create_indexes(target_relation) %} + {% endif %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + -- `COMMIT` happens here + {% do adapter.commit() %} + + {% for rel in to_drop %} + {% do adapter.drop_relation(rel) %} + {% endfor %} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {{ return({'relations': [target_relation]}) }} + +{%- endmaterialization %} +