diff --git a/dbt/include/synapse/macros/adapters/relation.sql b/dbt/include/synapse/macros/adapters/relation.sql index e5136e56..9dd4cf12 100644 --- a/dbt/include/synapse/macros/adapters/relation.sql +++ b/dbt/include/synapse/macros/adapters/relation.sql @@ -5,22 +5,26 @@ {% endmacro %} {% macro synapse__drop_relation_script(relation) -%} + {% if relation is not none %} {% if relation.type == 'view' or relation.type == 'materialized_view' -%} {% set object_id_type = 'V' %} {% elif relation.type == 'table'%} {% set object_id_type = 'U' %} {%- else -%} invalid target name {% endif %} - - if object_id ('{{ relation.include(database=False) }}','{{ object_id_type }}') is not null - {% if relation.type == 'view' or relation.type == 'materialized_view' -%} - begin - drop view {{ relation.include(database=False) }} - end - {% elif relation.type == 'table' %} - begin - drop {{ relation.type }} {{ relation.include(database=False) }} - end + if object_id ('{{ relation.include(database=False) }}','{{ object_id_type }}') is not null + {% if relation.type == 'view' or relation.type == 'materialized_view' -%} + begin + drop view {{ relation.include(database=False) }} + end + {% elif relation.type == 'table' %} + begin + drop {{ relation.type }} {{ relation.include(database=False) }} + end + {% endif %} + {% else %} + -- no object to drop + select 1 as nothing {% endif %} {% endmacro %} diff --git a/dbt/include/synapse/macros/adapters/replace.sql b/dbt/include/synapse/macros/adapters/replace.sql new file mode 100644 index 00000000..ec7bedf9 --- /dev/null +++ b/dbt/include/synapse/macros/adapters/replace.sql @@ -0,0 +1,50 @@ +{% macro get_replace_sql(existing_relation, target_relation, sql) %} + {{- log('Applying REPLACE to: ' ~ existing_relation) -}} + {{- adapter.dispatch('get_replace_sql', 'synapse')(existing_relation, target_relation, sql) -}} +{% endmacro %} + + +{% macro synapse__get_replace_sql(existing_relation, target_relation, sql) %} + + {# /* use a create or replace statement if possible */ #} + + {% set is_replaceable = existing_relation.type == target_relation_type and existing_relation.can_be_replaced %} + + {% if is_replaceable and existing_relation.is_view %} + {{ get_replace_view_sql(target_relation, sql) }} + + {% elif is_replaceable and existing_relation.is_table %} + {{ get_replace_table_sql(target_relation, sql) }} + + {% elif is_replaceable and existing_relation.is_materialized_view %} + {{ get_replace_materialized_view_sql(target_relation, sql) }} + + {# /* a create or replace statement is not possible, so try to stage and/or backup to be safe */ #} + + {# /* create target_relation as an intermediate relation, then swap it out with the existing one using a backup */ #} + {%- elif target_relation.can_be_renamed and existing_relation.can_be_renamed -%} + {{ get_create_intermediate_sql(target_relation, sql) }}; + {{ get_create_backup_sql(existing_relation) }}; + {{ get_rename_intermediate_sql(target_relation) }}; + {{ synapse__drop_relation(existing_relation) }} + + {# /* create target_relation as an intermediate relation, then swap it out with the existing one without using a backup */ #} + {%- elif target_relation.can_be_renamed -%} + {{ get_create_intermediate_sql(target_relation, sql) }}; + {{ synapse__drop_relation(existing_relation) }}; + {{ get_rename_intermediate_sql(target_relation) }} + + {# /* create target_relation in place by first backing up the existing relation */ #} + {%- elif existing_relation.can_be_renamed -%} + {{ get_create_backup_sql(existing_relation) }}; + {{ get_create_sql(target_relation, sql) }}; + {{ synapse__drop_relation(existing_relation) }} + + {# /* no renaming is allowed, so just drop and create */ #} + {%- else -%} + {{ synapse__drop_relation(existing_relation) }}; + {{ get_create_sql(target_relation, sql) }} + + {%- endif -%} + +{% endmacro %} diff --git a/dbt/include/synapse/macros/materializations/models/materialized_view/create_materialized_view_as.sql b/dbt/include/synapse/macros/materializations/models/materialized_view/create_materialized_view_as.sql new file mode 100644 index 00000000..7a4319a9 --- /dev/null +++ b/dbt/include/synapse/macros/materializations/models/materialized_view/create_materialized_view_as.sql @@ -0,0 +1,30 @@ +{% macro ref(model_name) %} + + {% do return(builtins.ref(model_name).include(database=false)) %} + +{% endmacro %} + + +{% macro synapse__get_replace_materialized_view_as_sql(relation, sql, existing_relation, backup_relation, intermediate_relation) %} + {# Synapse does not have ALTER...RENAME function, so use synapse__rename_relation_script #} + + {%- set dist = config.get('dist', default="ROUND_ROBIN") -%} + EXEC(' + CREATE materialized view [{{intermediate_relation.schema}}].[{{intermediate_relation.identifier}}] + WITH ( DISTRIBUTION = {{dist}} ) + AS {{ sql }} + '); + + {{ synapse__rename_relation_script(existing_relation, backup_relation) }} + {{ synapse__rename_relation_script(intermediate_relation, relation) }} + +{% endmacro %} + +{% macro synapse__get_create_materialized_view_as_sql(relation, sql) %} + {%- set dist = config.get('dist', default="ROUND_ROBIN") -%} + + CREATE materialized view [{{relation.schema}}].[{{relation.identifier}}] + WITH ( DISTRIBUTION = {{dist}} ) + AS {{ sql }} + +{% endmacro %} diff --git a/dbt/include/synapse/macros/materializations/models/materialized_view/materialized_view.sql b/dbt/include/synapse/macros/materializations/models/materialized_view/materialized_view.sql index 2dff1b4b..f7251d93 100644 --- a/dbt/include/synapse/macros/materializations/models/materialized_view/materialized_view.sql +++ b/dbt/include/synapse/macros/materializations/models/materialized_view/materialized_view.sql @@ -1,29 +1,122 @@ -{% macro ref(model_name) %} +{% materialization materialized_view, adapter='synapse' %} + {% set existing_relation = load_cached_relation(this) %} + {% set target_relation = this.incorporate(type=this.MaterializedView) %} + {% set intermediate_relation = make_intermediate_relation(target_relation) %} + {% set backup_relation_type = this.MaterializedView if existing_relation is none else existing_relation.type %} + {% set backup_relation = make_backup_relation(target_relation, backup_relation_type) %} - {% do return(builtins.ref(model_name).include(database=false)) %} + {{ materialized_view_setup(backup_relation, intermediate_relation, pre_hooks) }} + + {% set build_sql = materialized_view_get_build_sql(existing_relation, target_relation, backup_relation, intermediate_relation) %} + + {% if build_sql == '' %} + {{ materialized_view_execute_no_op(target_relation) }} + {% else %} + {{ materialized_view_execute_build_sql(build_sql, existing_relation, target_relation, post_hooks) }} + {% endif %} + + {{ materialized_view_teardown(backup_relation, intermediate_relation, post_hooks) }} + + {{ return({'relations': [target_relation]}) }} + +{% endmaterialization %} + + +{% macro materialized_view_setup(backup_relation, intermediate_relation, pre_hooks) %} + + -- backup_relation and intermediate_relation should not already exist in the database + -- it's possible these exist because of a previous run that exited unexpectedly + {% set preexisting_backup_relation = load_cached_relation(backup_relation) %} + {% set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) %} + + they return none + + -- drop the temp relations if they exist already in the database + {{ synapse__drop_relation(preexisting_backup_relation) }} + {{ synapse__drop_relation(preexisting_intermediate_relation) }} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + +{% endmacro %} + + +{% macro materialized_view_teardown(backup_relation, intermediate_relation, post_hooks) %} + + -- drop the temp relations if they exist to leave the database clean for the next run + {{ synapse__drop_relation_script(backup_relation) }} + {{ synapse__drop_relation_script(intermediate_relation) }} + + {{ run_hooks(post_hooks, inside_transaction=False) }} {% endmacro %} -{% macro synapse__get_replace_materialized_view_as_sql(relation, sql, existing_relation, backup_relation, intermediate_relation) %} - {# Synapse does not have ALTER...RENAME function, so use synapse__rename_relation_script #} - {%- set dist = config.get('dist', default="ROUND_ROBIN") -%} - EXEC(' - CREATE materialized view {{ intermediate_relation.include(database=False) }} - WITH ( DISTRIBUTION = {{dist}} ) - AS {{ sql }} - '); +{% macro materialized_view_get_build_sql(existing_relation, target_relation, backup_relation, intermediate_relation) %} - {{ synapse__rename_relation_script(existing_relation, backup_relation) }} - {{ synapse__rename_relation_script(intermediate_relation, relation) }} + {% set full_refresh_mode = should_full_refresh() %} + + -- determine the scenario we're in: create, full_refresh, alter, refresh data + {% if existing_relation is none %} + {% set build_sql = get_create_materialized_view_as_sql(target_relation, sql) %} + {% elif full_refresh_mode or not existing_relation.is_materialized_view %} + {% set build_sql = get_replace_sql(existing_relation, target_relation, sql) %} + {% else %} + -- get config options + {% set on_configuration_change = config.get('on_configuration_change') %} + {% set configuration_changes = get_materialized_view_configuration_changes(existing_relation, config) %} + + {% if configuration_changes is none %} + {% set build_sql = refresh_materialized_view(target_relation) %} + + {% elif on_configuration_change == 'apply' %} + {% set build_sql = get_alter_materialized_view_as_sql(target_relation, configuration_changes, sql, existing_relation, backup_relation, intermediate_relation) %} + {% elif on_configuration_change == 'continue' %} + {% set build_sql = '' %} + {{ exceptions.warn("Configuration changes were identified and `on_configuration_change` was set to `continue` for `" ~ target_relation ~ "`") }} + {% elif on_configuration_change == 'fail' %} + {{ exceptions.raise_fail_fast_error("Configuration changes were identified and `on_configuration_change` was set to `fail` for `" ~ target_relation ~ "`") }} + + {% else %} + -- this only happens if the user provides a value other than `apply`, 'skip', 'fail' + {{ exceptions.raise_compiler_error("Unexpected configuration scenario") }} + + {% endif %} + + {% endif %} + + {% do return(build_sql) %} {% endmacro %} -{% macro synapse__get_create_materialized_view_as_sql(relation, sql) %} - {%- set dist = config.get('dist', default="ROUND_ROBIN") -%} - CREATE materialized view {{ relation.include(database=False) }} - WITH ( DISTRIBUTION = {{dist}} ) - AS {{ sql }} +{% macro materialized_view_execute_no_op(target_relation) %} + {% do store_raw_result( + name="main", + message="skip " ~ target_relation, + code="skip", + rows_affected="-1" + ) %} +{% endmacro %} + + +{% macro materialized_view_execute_build_sql(build_sql, existing_relation, target_relation, post_hooks) %} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% set grant_config = config.get('grants') %} + + {% call statement(name="main") %} + {{ build_sql }} + {% endcall %} + + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + {{ adapter.commit() }} {% endmacro %} diff --git a/dbt/include/synapse/macros/materializations/models/table/create_table_as.sql b/dbt/include/synapse/macros/materializations/models/table/create_table_as.sql index 744e7872..ac27d7ce 100644 --- a/dbt/include/synapse/macros/materializations/models/table/create_table_as.sql +++ b/dbt/include/synapse/macros/materializations/models/table/create_table_as.sql @@ -31,7 +31,6 @@ {{ "["~column~"]" }}{{ ", " if not loop.last }} {% endfor %} {%endset%} - {{ synapse__build_model_constraints(relation) }} INSERT INTO [{{relation.schema}}].[{{relation.identifier}}] ({{listColumns}}) SELECT {{listColumns}} FROM [{{tmp_relation.schema}}].[{{tmp_relation.identifier}}] diff --git a/tests/functional/adapter/test_constraints.py b/tests/functional/adapter/test_constraints.py index fe93747b..2761a1d9 100644 --- a/tests/functional/adapter/test_constraints.py +++ b/tests/functional/adapter/test_constraints.py @@ -510,10 +510,6 @@ def expected_sql(self): select ''blue'' as color,1 as id,''2019-01-01'' as date_day;'); create table ([id] int not null,[color] varchar(100),[date_day] varchar(100)) with(distribution = round_robin,heap) - alter table add constraint - primary key nonclustered(id)not enforced; - alter table add constraint - unique nonclustered(color,date_day)not enforced; insert into ([id],[color],[date_day]) select [id],[color],[date_day] from if object_id is not null begin drop view end @@ -585,13 +581,15 @@ def test__constraints_enforcement_rollback( # Make a contract-breaking change to the model write_file(null_model_sql, "models", "my_model.sql") - + # drops the previous table before + # when there is an exception, cant rollback failing_results = run_dbt(["run", "-s", "my_model"], expect_pass=False) assert len(failing_results) == 1 # Verify the previous table still exists relation = relation_from_name(project.adapter, "my_model") - old_model_exists_sql = f"select * from {relation}" + model_backup = str(relation).replace("my_model", "my_model__dbt_backup") + old_model_exists_sql = f"select * from {model_backup}" old_model_exists = project.run_sql(old_model_exists_sql, fetch="all") assert len(old_model_exists) == 1 assert old_model_exists[0][1] == expected_color @@ -653,7 +651,7 @@ def expected_sql(self): if object_id is not null begin drop view end if object_id is not null begin drop table end exec(\'create view as select \'\'blue\'\' as "from",1 as id,\'\'2019-01-01\'\' as date_day;\'); - create table ([id] integer not null,[from] varchar(100)not null,[date_day] varchar(100)) + create table ([id] int not null,[from] varchar(100)not null,[date_day] varchar(100)) with(distribution = round_robin,heap) insert into ([id],[from],[date_day]) select [id],[from],[date_day] from @@ -692,7 +690,37 @@ class TestTableConstraintsRollbackSynapse(BaseConstraintsRollback): class TestIncrementalConstraintsRollbackSynapse(BaseIncrementalConstraintsRollback): - pass + def test__constraints_enforcement_rollback( + self, project, expected_color, expected_error_messages, null_model_sql + ): + results = run_dbt(["run", "-s", "my_model"]) + assert len(results) == 1 + + # Make a contract-breaking change to the model + write_file(null_model_sql, "models", "my_model.sql") + # drops the previous table before + # when there is an exception, cant rollback + failing_results = run_dbt(["run", "-s", "my_model"], expect_pass=False) + assert len(failing_results) == 1 + + # Verify the previous table still exists, + # for incremental we are not creating backups, because its not a create replace + relation = relation_from_name(project.adapter, "my_model") + old_model_exists_sql = f"select * from {relation}" + old_model_exists = project.run_sql(old_model_exists_sql, fetch="all") + assert len(old_model_exists) == 1 + assert old_model_exists[0][1] == expected_color + + # Confirm this model was contracted + # TODO: is this step really necessary? + manifest = get_manifest(project.project_root) + model_id = "model.test.my_model" + my_model_config = manifest.nodes[model_id].config + contract_actual_config = my_model_config.contract + assert contract_actual_config.enforced is True + + # Its result includes the expected error messages + self.assert_expected_error_messages(failing_results[0].message, expected_error_messages) class TestTableContractSqlHeaderSynapse(BaseTableContractSqlHeader): diff --git a/tests/functional/adapter/test_materialized_views.py b/tests/functional/adapter/test_materialized_views.py index 06e86220..576257cc 100644 --- a/tests/functional/adapter/test_materialized_views.py +++ b/tests/functional/adapter/test_materialized_views.py @@ -42,7 +42,8 @@ def drop_cascade(project, test_model_identifier): # SYNAPSE HAS NO "DROP SCHEMA...CASCADE" # so drop all test materializations, to allow drop my_seed - # "my_materialized_view" always created in setup(), so always need to be dropped before my_seed + # "my_materialized_view" and its backup always created in setup(), + # so always need to be dropped before my_seed for identifier in ["my_materialized_view", test_model_identifier]: project.run_sql( f""" @@ -55,6 +56,16 @@ def drop_cascade(project, test_model_identifier): begin drop table "{project.test_schema}"."{identifier}" end + + if object_id ('"{project.test_schema}"."{identifier}__dbt_backup"','V') is not null + begin + drop view "{project.test_schema}"."{identifier}__dbt_backup" + end + + if object_id ('"{project.test_schema}"."{identifier}__dbt_backup"','U') is not null + begin + drop table "{project.test_schema}"."{identifier}__dbt_backup" + end """ ) # then drop object my_seed, to allow drop schema