Skip to content

Commit

Permalink
restructure incremental implementation to follow default implementati…
Browse files Browse the repository at this point in the history
…on (#129)
  • Loading branch information
tovganesh authored Nov 18, 2022
1 parent b97852d commit 04bbb90
Showing 1 changed file with 45 additions and 68 deletions.
113 changes: 45 additions & 68 deletions dbt/include/impala/macros/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@
# limitations under the License.
#}

{% macro validate_get_incremental_strategy(raw_strategy) %}
{% macro validate_get_incremental_strategy(incremental_strategy) %}
{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Invalid incremental strategy provided: {{ incremental_strategy }}
Expected one of: 'append', 'insert_overwrite'
{%- endset %}

{% if raw_strategy not in ['append', 'insert_overwrite'] %}
{% if incremental_strategy not in ['append', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{% endif %}

{% do return(raw_strategy) %}
{% do return(incremental_strategy) %}
{% endmacro %}

{% macro incremental_validate_on_schema_change(on_schema_change, default='ignore') %}
Expand All @@ -46,47 +46,42 @@

{% materialization incremental, adapter='impala' -%}

-- relations
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}
{%- set temp_relation = make_temp_relation(target_relation)-%}
{%- set intermediate_relation = make_intermediate_relation(target_relation)-%}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}

-- configs
{% set unique_key = config.get('unique_key') %}
{% set overwrite_msg -%}
impala adapter does not support 'unique_key'
{% set uniquekey_msg -%}
Impala adapter does not support 'unique_key'
{%- endset %}
{% if unique_key is not none %}
{% do exceptions.raise_compiler_error(overwrite_msg) %}
{% do exceptions.raise_compiler_error(uniquekey_msg) %}
{% endif %}

{% set raw_strategy = config.get('incremental_strategy', default='append') %}
{% if raw_strategy == None %}
{% set raw_strategy = 'append' %}
{% set incremental_strategy = config.get('incremental_strategy') or 'append' %}
{% if incremental_strategy == None %}
{% set incremental_strategy = 'append' %}
{% endif %}
{% set strategy = validate_get_incremental_strategy(raw_strategy) %}

{% set incremental_strategy = validate_get_incremental_strategy(incremental_strategy) %}
{%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

-- grab current tables grants config for comparision later on
{% set grant_config = config.get('grants') %}

{%- set time_stamp = modules.datetime.datetime.now().isoformat().replace("-","").replace(":","").replace(".","") -%}

{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(target_relation, '__' + time_stamp + '__dbt_tmp') %}
{%- set full_refresh_mode = (should_full_refresh()) -%}
-- log incremental strategy
{% do target_relation.log_relation(incremental_strategy) %}

{% do target_relation.log_relation(raw_strategy) %}

{% set tmp_identifier = model['name'] + '__' + time_stamp + '__dbt_tmp' %}
{% set backup_identifier = model['name'] + '__' + time_stamp + "__dbt_backup" %}

-- the intermediate_ and backup_ relations should not already exist in the database; get_relation
-- the temp_ and backup_ relations should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
-- later, before we try to use this name for the current operation. This has to happen before
-- BEGIN, in a separate transaction
{% set preexisting_intermediate_relation = adapter.get_relation(identifier=tmp_identifier,
schema=schema,
database=database) %}
{% set preexisting_backup_relation = adapter.get_relation(identifier=backup_identifier,
schema=schema,
database=database) %}
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
-- grab current tables grants config for comparision later on
{% set grant_config = config.get('grants') %}
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}

Expand All @@ -97,56 +92,42 @@

{% set to_drop = [] %}

{% do to_drop.append(tmp_relation) %}

{# -- first check whether we want to full refresh for source view or config reasons #}
{% set trigger_full_refresh = (full_refresh_mode or existing_relation.is_view) %}

{% if existing_relation is none %}
{# -- ensure that the target_relation is dropped before trying to create it #}
{{ drop_relation_if_exists(target_relation) }}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif trigger_full_refresh %}
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
{% set tmp_identifier = model['name'] + '__' + time_stamp + '__dbt_tmp' %}
{% set backup_identifier = model['name'] + '__' + time_stamp + '__dbt_backup' %}
{% set intermediate_relation = existing_relation.incorporate(path={"identifier": tmp_identifier}) %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}

{# -- ensure that the intermediate_relation is dropped before trying to create it #}
{{ drop_relation_if_exists(intermediate_relation) }}

{% set build_sql = create_table_as(False, intermediate_relation, sql) %}
{% set build_sql = get_create_table_as_sql(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{% set build_sql = get_create_table_as_sql(False, intermediate_relation, sql) %}
{% set need_swap = true %}
{% do to_drop.append(backup_relation) %}
{% do to_drop.append(intermediate_relation) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do run_query(get_create_table_as_sql(True, temp_relation, sql)) %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
from_relation=temp_relation,
to_relation=target_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{#-- since unique key is not supported, the following macro (default impl), will only return insert stm, and hence is directly used here --#}
{#-- set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) --#}
{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_predicates = config.get('incremental_predicates', none) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'predicates': incremental_predicates }) %}
{% set build_sql = get_incremental_default_sql(strategy_arg_dict) %}

{% set build_sql = get_insert_overwrite_sql(target_relation, tmp_relation, dest_columns) %}

{% endif %}

{% call statement("main") %}
{{ build_sql }}
{% endcall %}

{% if need_swap %}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% do adapter.rename_relation(intermediate_relation, target_relation) %}
{% if need_swap %}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% do adapter.rename_relation(intermediate_relation, target_relation) %}
{% do to_drop.append(backup_relation) %}
{% endif %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{% if existing_relation is none or existing_relation.is_view or should_full_refresh() %}
Expand All @@ -164,10 +145,6 @@

{{ run_hooks(post_hooks, inside_transaction=False) }}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}

0 comments on commit 04bbb90

Please sign in to comment.