Skip to content

Commit

Permalink
Incremental bug fix. updates to table and view materialization. added…
Browse files Browse the repository at this point in the history
… a few macros, utils
  • Loading branch information
prdpsvs committed Apr 24, 2024
1 parent dc81ac8 commit dc6f018
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 63 deletions.
3 changes: 2 additions & 1 deletion dbt/include/fabric/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

{% macro fabric__list_schemas(database) %}
{% call statement('list_schemas', fetch_result=True, auto_begin=False) -%}

select name as [schema]
from sys.schemas {{ information_schema_hints() }}
{% endcall %}
Expand All @@ -28,6 +27,7 @@

{% macro fabric__list_relations_without_caching(schema_relation) -%}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
USE [{{ schema_relation.database }}];
with base as (
select
DB_NAME() as [database],
Expand All @@ -51,6 +51,7 @@

{% macro fabric__get_relation_without_caching(schema_relation) -%}
{% call statement('get_relation_without_caching', fetch_result=True) -%}
USE [{{ schema_relation.database }}];
with base as (
select
DB_NAME() as [database],
Expand Down
30 changes: 0 additions & 30 deletions dbt/include/fabric/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,36 +39,6 @@
path={"schema": reference[0], "identifier": reference[1]})) }}
{% endfor %}
{% elif relation.type == 'table'%}
{# {% call statement('find_references', fetch_result=true) %}
USE [{{ relation.database }}];
SELECT obj.name AS FK_NAME,
sch.name AS [schema_name],
tab1.name AS [table],
col1.name AS [column],
tab2.name AS [referenced_table],
col2.name AS [referenced_column]
FROM sys.foreign_key_columns fkc
INNER JOIN sys.objects obj
ON obj.object_id = fkc.constraint_object_id
INNER JOIN sys.tables tab1
ON tab1.object_id = fkc.parent_object_id
INNER JOIN sys.schemas sch
ON tab1.schema_id = sch.schema_id
INNER JOIN sys.columns col1
ON col1.column_id = parent_column_id AND col1.object_id = tab1.object_id
INNER JOIN sys.tables tab2
ON tab2.object_id = fkc.referenced_object_id
INNER JOIN sys.columns col2
ON col2.column_id = referenced_column_id AND col2.object_id = tab2.object_id
WHERE sch.name = '{{ relation.schema }}' and tab2.name = '{{ relation.identifier }}'
{% endcall %}
{% set references = load_result('find_references')['data'] %}
{% for reference in references -%}
-- dropping referenced table {{ reference[0] }}.{{ reference[1] }}
{{ fabric__drop_relation_script(relation.incorporate(
type="table",
path={"schema": reference[1], "identifier": reference[2]})) }}
{% endfor %} #}
{% set object_id_type = 'U' %}

{%- else -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@

{% materialization incremental, adapter='fabric' -%}

{%- set full_refresh_mode = (should_full_refresh()) -%}
{% set target_relation = this.incorporate(type='table') %}
{%- set relations_list = fabric__get_relation_without_caching(target_relation) -%}

{%- set existing_relation = none %}
{% if (relations_list|length == 1) and (relations_list[0][2] == target_relation.schema)
and (relations_list[0][1] == target_relation.identifier) and (relations_list[0][3] == target_relation.type)%}
{% set existing_relation = target_relation %}
{% elif (relations_list|length == 1) and (relations_list[0][2] == target_relation.schema)
and (relations_list[0][1] == target_relation.identifier) and (relations_list[0][3] != target_relation.type) %}
{% set existing_relation = get_or_create_relation(relations_list[0][0], relations_list[0][2] , relations_list[0][1] , relations_list[0][3])[1] %}
{% endif %}

{{ log("Full refresh mode" ~ full_refresh_mode)}}
{{ log("existing relation : "~existing_relation ~ " type "~ existing_relation.type ~ " is view? "~existing_relation.is_view) }}
{{ log("target relation: " ~target_relation ~ " type "~ target_relation.type ~ " is view? "~target_relation.is_view) }}

-- configs
{%- set unique_key = config.get('unique_key') -%}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{%- set temp_relation = make_temp_relation(target_relation)-%}

{% set grant_config = config.get('grants') %}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}

{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% if existing_relation is none %}

{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{%- endcall -%}

{% elif existing_relation.is_view %}

{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{{ drop_relation_if_exists(existing_relation) }}
{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{%- endcall -%}

{% elif full_refresh_mode %}

{%- call statement('main') -%}
{{ fabric__create_table_as(False, target_relation, sql)}}
{%- endcall -%}

{% else %}

{%- call statement('create_tmp_relation') -%}
{{ fabric__create_table_as(True, temp_relation, sql)}}
{%- endcall -%}
{% do adapter.expand_target_column_types(
from_relation=temp_relation,
to_relation=target_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
{%- call statement('main') -%}
{{ strategy_sql_macro_func(strategy_arg_dict) }}
{%- endcall -%}
{% endif %}

{% do drop_relation_if_exists(temp_relation) %}
{{ run_hooks(post_hooks, inside_transaction=True) }}

{% set target_relation = target_relation.incorporate(type='table') %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}
{% do adapter.commit() %}
{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{% if arg_dict["unique_key"] %}
{% do return(get_incremental_delete_insert_sql(arg_dict)) %}
{% else %}
-- Incremental Append will insert data into target table.
{% do return(get_incremental_append_sql(arg_dict)) %}
{% endif %}

Expand Down
43 changes: 30 additions & 13 deletions dbt/include/fabric/macros/materializations/models/table/table.sql
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
{% materialization table, adapter='fabric' %}

-- Create target relation
-- Load target relation
{%- set target_relation = this.incorporate(type='table') %}
-- Load existing relation
{%- set relation = fabric__get_relation_without_caching(this) %}

{% set existing_relation = none %}
{% if (relation|length == 1) %}
{% set existing_relation = get_or_create_relation(relation[0][0], relation[0][2] , relation[0][1] , relation[0][3])[1] %}
{% endif %}

{%- set backup_relation = none %}
{{log("Existing Relation type is "~ existing_relation.type)}}
{% if (existing_relation != none and existing_relation.type == "table") %}
{%- set backup_relation = make_backup_relation(target_relation, 'table') -%}
{% elif (existing_relation != none and existing_relation.type == "view") %}
{%- set backup_relation = make_backup_relation(target_relation, 'view') -%}
{% endif %}

{% if (existing_relation != none) %}
-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(backup_relation) }}
-- Rename target relation as backup relation
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{% endif %}

-- grab current tables grants config for comparision later on
{% set grant_config = config.get('grants') %}
{%- set backup_relation = make_backup_relation(target_relation, 'table') -%}
-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(backup_relation) }}
-- Rename target relation as backup relation
{%- set relation = fabric__get_relation_without_caching(target_relation) %}
{% if relation|length > 0 %}
{{ adapter.rename_relation(target_relation, backup_relation) }}
{% endif %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}
-- `BEGIN` happens here:
Expand All @@ -29,10 +44,12 @@
-- `COMMIT` happens here
{{ adapter.commit() }}

-- finally, drop the foreign key references if exists
{{ drop_fk_indexes_on_table(backup_relation) }}
-- drop existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}
{% if (backup_relation != none) %}
-- finally, drop the foreign key references if exists
{{ drop_fk_indexes_on_table(backup_relation) }}
-- drop existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}
{% endif %}
-- Add constraints including FK relation.
{{ fabric__build_model_constraints(target_relation) }}
{{ run_hooks(post_hooks, inside_transaction=False) }}
Expand Down
41 changes: 26 additions & 15 deletions dbt/include/fabric/macros/materializations/models/view/view.sql
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
{% materialization view, adapter='fabric' -%}

{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='view') -%}

-- make back up relation
{%- set backup_relation_type = 'view' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}
{{log("Target Relation "~target_relation)}}

{%- set relation = fabric__get_relation_without_caching(this) %}
{% set existing_relation = none %}
{% if (relation|length == 1) %}
{% set existing_relation = get_or_create_relation(relation[0][0], relation[0][2] , relation[0][1] , relation[0][3])[1] %}
{% endif %}
{{log("Existing Relation "~existing_relation)}}

{%- set backup_relation = none %}
{{log("Existing Relation type is "~ existing_relation.type)}}
{% if (existing_relation != none and existing_relation.type == "table") %}
{%- set backup_relation = make_backup_relation(target_relation, 'table') -%}
{% elif (existing_relation != none and existing_relation.type == "view") %}
{%- set backup_relation = make_backup_relation(target_relation, 'view') -%}
{% endif %}

{% if (existing_relation != none) %}
-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(backup_relation) }}
-- Rename target relation as backup relation
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{% endif %}

{% set grant_config = config.get('grants') %}
{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- drop target relation if exists already in the database
{{ drop_relation_if_exists(backup_relation) }}

{%- set relation = fabric__get_relation_without_caching(target_relation) %}
{% if relation|length > 0 %}
{{ adapter.rename_relation(target_relation, backup_relation) }}
{% endif %}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

Expand All @@ -32,9 +42,10 @@
{% do persist_docs(target_relation, model) %}
{{ run_hooks(post_hooks, inside_transaction=True) }}
{{ adapter.commit() }}
{{ drop_relation_if_exists(backup_relation) }}
{% if (backup_relation != none) %}
{{ drop_relation_if_exists(backup_relation) }}
{% endif %}
{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization -%}
6 changes: 5 additions & 1 deletion dbt/include/fabric/macros/utils/concat.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
{% macro fabric__concat(fields) -%}
concat({{ fields|join(', ') }})
{%- if fields|length < 2 -%}
{{ fields[0] }}
{%- else -%}
concat({{ fields|join(', ') }})
{%- endif -%}
{%- endmacro %}
12 changes: 12 additions & 0 deletions dbt/include/fabric/macros/utils/get_tables_by_pattern.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{% macro fabric__get_tables_by_pattern_sql(schema_pattern, table_pattern, exclude='', database=target.database) %}

select distinct
table_schema as {{ adapter.quote('table_schema') }},
table_name as {{ adapter.quote('table_name') }},
{{ dbt_utils.get_table_types_sql() }}
from {{ database }}.INFORMATION_SCHEMA.TABLES
where table_schema like '{{ schema_pattern }}'
and table_name like '{{ table_pattern }}'
and table_name not like '{{ exclude }}'

{% endmacro %}
Loading

0 comments on commit dc6f018

Please sign in to comment.