Skip to content

Commit

Permalink
incremental support - strategy: append, insert_overwrite; onschema_c…
Browse files Browse the repository at this point in the history
…hange: fail, ignore (#1)

not supported strategy: merge, delete_overwrite; onschema_change: append_new_columns, sync_new_columns
    not supported: unique_key

    Ticket (internal JIRA):
            https://jira.cloudera.com/browse/DBT-38
            https://jira.cloudera.com/browse/DBT-39
            https://jira.cloudera.com/browse/DBT-48
            https://jira.cloudera.com/browse/DBT-49

    Testplan:
                1. Basic dependencies need to be installed (dbt-core).
                2. Build and install the dbt-impala adapter using:
                       python3 setup.py install

                3. Create a template dbt project using following:
                       dbt init

                4. Edit $HOME/.dbt/profiles.yml so that it looks similar to:

                       demo_dbt:
                         outputs:

                           dev_impala:
                             type: impala
                             host: localhost
                             port: 21050
                             dbname: s3test
                             schema: s3test

                         target: dev_impala

                5. In the dbt project generated in step (2), run the following, which should succeed if local instance of Impala is up:
                        dbt debug (check connection)

                6. Create an incremental model with entry similar to, name it say, incremental_model.sql:
                       {{
                           config(
                               materialized='incremental',
                               unique_key='id',
                               incremental_strategy='insert_overwrite',
                           )
                       }}

                       select * from {{ ref('seed_sample') }}

                       {% if is_incremental() %}
                           where updated_at > (select max(updated_at) from {{ this }})
                       {% endif %}

                7. Run this model using:
                       dbt run [--full-refresh] --select incremental_model

                   This should produce output similar to:
                       18:01:35  1 of 1 OK created incremental model s3test.my_third_dbt_model................... [OK in 63.16s]
                       18:01:35
                       18:01:35  Finished running 1 incremental model in 63.24s.
                       18:01:35
                       18:01:35  Completed successfully
                       18:01:35
                       18:01:35  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

Co-authored-by: V. Ganesh <[email protected]>
  • Loading branch information
tovganesh and tovganesh authored Mar 25, 2022
1 parent 4e5cf5b commit 6028f2e
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 1 deletion.
2 changes: 1 addition & 1 deletion dbt/include/impala/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@

{% macro impala__drop_schema(relation) -%}
{%- call statement('drop_schema') -%}
drop schema if exists {{ relation }}
drop schema if exists {{ relation }} cascade
{%- endcall -%}
{% endmacro %}

Expand Down
151 changes: 151 additions & 0 deletions dbt/include/impala/macros/incremental.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
{#
# Copyright 2022 Cloudera Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#}

{% macro validate_get_incremental_strategy(raw_strategy) %}
{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Expected one of: 'append', 'insert_overwrite'
{%- endset %}

{% if raw_strategy not in ['append', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{% endif %}

{% do return(raw_strategy) %}
{% endmacro %}

{% macro incremental_validate_on_schema_change(on_schema_change, default='ignore') %}
{% if on_schema_change not in ['fail', 'ignore'] %}
{% set log_message = 'Invalid value for on_schema_change (%s) specified. Setting default value of %s.' % (on_schema_change, default) %}
{% do log(log_message) %}

{% do exceptions.raise_compiler_error(log_message) %}

{{ return(default) }}
{% else %}
{{ return(on_schema_change) }}
{% endif %}
{% endmacro %}

{% materialization incremental, adapter='impala' -%}

{% set unique_key = config.get('unique_key') %}
{% set overwrite_msg -%}
impala adapter does not support 'unique_key'
{%- endset %}
{% if unique_key is not none %}
{% do exceptions.raise_compiler_error(overwrite_msg) %}
{% endif %}

{% set raw_strategy = config.get('incremental_strategy', default='append') %}
{% set strategy = validate_get_incremental_strategy(raw_strategy) %}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

{%- set time_stamp = modules.datetime.datetime.now().isoformat().replace("-","").replace(":","").replace(".","") -%}

{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(target_relation, '__' + time_stamp + '__dbt_tmp') %}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{% set tmp_identifier = model['name'] + '__' + time_stamp + '__dbt_tmp' %}
{% set backup_identifier = model['name'] + '__' + time_stamp + "__dbt_backup" %}

-- the intermediate_ and backup_ relations should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
-- later, before we try to use this name for the current operation. This has to happen before
-- BEGIN, in a separate transaction
{% set preexisting_intermediate_relation = adapter.get_relation(identifier=tmp_identifier,
schema=schema,
database=database) %}
{% set preexisting_backup_relation = adapter.get_relation(identifier=backup_identifier,
schema=schema,
database=database) %}
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set to_drop = [] %}

{% do to_drop.append(tmp_relation) %}

{# -- first check whether we want to full refresh for source view or config reasons #}
{% set trigger_full_refresh = (full_refresh_mode or existing_relation.is_view) %}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif trigger_full_refresh %}
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
{% set tmp_identifier = model['name'] + '__' + time_stamp + '__dbt_tmp' %}
{% set backup_identifier = model['name'] + '__' + time_stamp + '__dbt_backup' %}
{% set intermediate_relation = existing_relation.incorporate(path={"identifier": tmp_identifier}) %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}

{% set build_sql = create_table_as(False, intermediate_relation, sql) %}
{% set need_swap = true %}
{% do to_drop.append(backup_relation) %}
{% do to_drop.append(intermediate_relation) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{#-- since unique key is not supported, the follow macro (default impl), will only return insert stm, and hence is directly used here --#}
{% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %}

{% endif %}

{% call statement("main") %}
{{ build_sql }}
{% endcall %}

{% if need_swap %}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% do adapter.rename_relation(intermediate_relation, target_relation) %}
{% endif %}

{% do persist_docs(target_relation, model) %}

{% if existing_relation is none or existing_relation.is_view or should_full_refresh() %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{% do adapter.commit() %}

{% for rel in to_drop %}
{% do adapter.drop_relation(rel) %}
{% endfor %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}

0 comments on commit 6028f2e

Please sign in to comment.