diff --git a/dbt/include/fabric/macros/adapters/relation.sql b/dbt/include/fabric/macros/adapters/relation.sql index 09c2533d..cb21dd62 100644 --- a/dbt/include/fabric/macros/adapters/relation.sql +++ b/dbt/include/fabric/macros/adapters/relation.sql @@ -38,10 +38,10 @@ type="view", path={"schema": reference[0], "identifier": reference[1]})) }} {% endfor %} - {% elif relation.type == 'table'%} - {%- else -%} - {{ exceptions.raise_not_implemented('Invalid relation being dropped: ' ~ relation) }} - {% endif %} + {% elif relation.type == 'table'%} + {%- else -%} + {{ exceptions.raise_not_implemented('Invalid relation being dropped: ' ~ relation) }} + {% endif %} {{ use_database_hint() }} EXEC('DROP {{ relation.type }} IF EXISTS {{ relation.include(database=False) }};'); diff --git a/dbt/include/fabric/macros/materializations/models/table/clone.sql b/dbt/include/fabric/macros/materializations/models/table/clone.sql new file mode 100644 index 00000000..c7e5b609 --- /dev/null +++ b/dbt/include/fabric/macros/materializations/models/table/clone.sql @@ -0,0 +1,72 @@ +{% macro fabric__can_clone_table() %} + {{ return(True) }} +{% endmacro %} + +{% macro fabric__create_or_replace_clone(this_relation, defer_relation) %} + CREATE TABLE {{this_relation}} + AS CLONE OF {{defer_relation}} +{% endmacro %} + +{%- materialization clone, adapter='fabric' -%} + + {%- set relations = {'relations': []} -%} + + {%- if not defer_relation -%} + -- nothing to do + {{ log("No relation found in state manifest for " ~ model.unique_id, info=True) }} + {{ return(relations) }} + {%- endif -%} + + {%- set existing_relation = load_cached_relation(this) -%} + {{ log("existing relation is "~existing_relation, info=True) }} + {{ log("defer relation is "~defer_relation, info=True) }} + + {%- if existing_relation and not flags.FULL_REFRESH -%} + -- noop! + {{ log("Relation " ~ existing_relation ~ " already exists", info=True) }} + {{ return(relations) }} + {%- endif -%} + + {%- set other_existing_relation = load_cached_relation(defer_relation) -%} + {{ log("other existing relation is "~other_existing_relation, info=True) }} + -- If this is a database that can do zero-copy cloning of tables, and the other relation is a table, then this will be a table + -- Otherwise, this will be a view + + {% set can_clone_table = can_clone_table() %} + + {%- if other_existing_relation and other_existing_relation.type == 'table' and can_clone_table -%} + + {%- set target_relation = this.incorporate(type='table') -%} + {% if existing_relation is not none and not existing_relation.is_table %} + {{ log("Dropping relation " ~ existing_relation ~ " because it is of type " ~ existing_relation.type) }} + {{ fabric__drop_relation_script(existing_relation) }} + {% endif %} + + -- as a general rule, data platforms that can clone tables can also do atomic 'create or replace' + {% call statement('main') %} + {{ create_or_replace_clone(target_relation, defer_relation) }} + {% endcall %} + + {# {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + {% do persist_docs(target_relation, model) %} #} + + {{ return({'relations': [target_relation]}) }} + + {%- else -%} + + {%- set target_relation = this.incorporate(type='view') -%} + + -- reuse the view materialization + -- TODO: support actual dispatch for materialization macros + -- Tracking ticket: https://github.com/dbt-labs/dbt-core/issues/7799 + {% set search_name = "materialization_view_" ~ adapter.type() %} + {% if not search_name in context %} + {% set search_name = "materialization_view_default" %} + {% endif %} + {% set materialization_macro = context[search_name] %} + {% set relations = materialization_macro() %} + {{ return(relations) }} + {% endif %} + +{%- endmaterialization -%} diff --git a/tests/functional/adapter/test_dbt_clone.py b/tests/functional/adapter/test_dbt_clone.py new file mode 100644 index 00000000..87e0c5ac --- /dev/null +++ b/tests/functional/adapter/test_dbt_clone.py @@ -0,0 +1,236 @@ +import os +import shutil +from collections import Counter +from copy import deepcopy + +import pytest +from dbt.exceptions import DbtRuntimeError +from dbt.tests.adapter.dbt_clone.fixtures import ( + custom_can_clone_tables_false_macros_sql, + ephemeral_model_sql, + exposures_yml, + get_schema_name_sql, + infinite_macros_sql, + macros_sql, + schema_yml, + seed_csv, + snapshot_sql, + table_model_sql, + view_model_sql, +) +from dbt.tests.util import run_dbt + + +class BaseClone: + @pytest.fixture(scope="class") + def models(self): + return { + "table_model.sql": table_model_sql, + "view_model.sql": view_model_sql, + "ephemeral_model.sql": ephemeral_model_sql, + "schema.yml": schema_yml, + "exposures.yml": exposures_yml, + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "macros.sql": macros_sql, + "infinite_macros.sql": infinite_macros_sql, + "get_schema_name.sql": get_schema_name_sql, + } + + @pytest.fixture(scope="class") + def seeds(self): + return { + "seed.csv": seed_csv, + } + + @pytest.fixture(scope="class") + def snapshots(self): + return { + "snapshot.sql": snapshot_sql, + } + + @pytest.fixture(scope="class") + def other_schema(self, unique_schema): + return unique_schema + "_other" + + @property + def project_config_update(self): + return { + "seeds": { + "test": { + "quote_columns": False, + } + } + } + + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target, unique_schema, other_schema): + outputs = {"default": dbt_profile_target, "otherschema": deepcopy(dbt_profile_target)} + outputs["default"]["schema"] = unique_schema + outputs["otherschema"]["schema"] = other_schema + return {"test": {"outputs": outputs, "target": "default"}} + + def copy_state(self, project_root): + state_path = os.path.join(project_root, "state") + if not os.path.exists(state_path): + os.makedirs(state_path) + shutil.copyfile( + f"{project_root}/target/manifest.json", f"{project_root}/state/manifest.json" + ) + + def run_and_save_state(self, project_root, with_snapshot=False): + results = run_dbt(["seed"]) + assert len(results) == 1 + assert not any(r.node.deferred for r in results) + results = run_dbt(["run"]) + assert len(results) == 2 + assert not any(r.node.deferred for r in results) + results = run_dbt(["test"]) + assert len(results) == 2 + + if with_snapshot: + results = run_dbt(["snapshot"]) + assert len(results) == 1 + assert not any(r.node.deferred for r in results) + + # copy files + self.copy_state(project_root) + + +# -- Below we define base classes for tests you import the one based on if your adapter uses dbt clone or not -- +class BaseClonePossible(BaseClone): + def test_can_clone_true(self, project, unique_schema, other_schema): + project.create_test_schema(other_schema) + self.run_and_save_state(project.project_root, with_snapshot=True) + + clone_args = [ + "clone", + "--state", + "state", + "--target", + "otherschema", + ] + + results = run_dbt(clone_args) + assert len(results) == 4 + + schema_relations = project.adapter.list_relations( + database=project.database, schema=other_schema + ) + types = [r.type for r in schema_relations] + count_types = Counter(types) + assert count_types == Counter({"table": 3, "view": 1}) + + # objects already exist, so this is a no-op + results = run_dbt(clone_args) + assert len(results) == 4 + assert all("no-op" in r.message.lower() for r in results) + + # recreate all objects + results = run_dbt([*clone_args, "--full-refresh"]) + assert len(results) == 4 + + # select only models this time + results = run_dbt([*clone_args, "--resource-type", "model"]) + assert len(results) == 2 + assert all("no-op" in r.message.lower() for r in results) + + def test_clone_no_state(self, project, unique_schema, other_schema): + project.create_test_schema(other_schema) + self.run_and_save_state(project.project_root, with_snapshot=True) + + clone_args = [ + "clone", + "--target", + "otherschema", + ] + + with pytest.raises( + DbtRuntimeError, + match="--state or --defer-state are required for deferral, but neither was provided", + ): + run_dbt(clone_args) + + +class BaseCloneNotPossible(BaseClone): + @pytest.fixture(scope="class") + def macros(self): + return { + "macros.sql": macros_sql, + "my_can_clone_tables.sql": custom_can_clone_tables_false_macros_sql, + "infinite_macros.sql": infinite_macros_sql, + "get_schema_name.sql": get_schema_name_sql, + } + + def test_can_clone_false(self, project, unique_schema, other_schema): + project.create_test_schema(other_schema) + self.run_and_save_state(project.project_root, with_snapshot=True) + + clone_args = [ + "clone", + "--state", + "state", + "--target", + "otherschema", + ] + + results = run_dbt(clone_args) + assert len(results) == 4 + + schema_relations = project.adapter.list_relations( + database=project.database, schema=other_schema + ) + assert all(r.type == "view" for r in schema_relations) + + # objects already exist, so this is a no-op + results = run_dbt(clone_args) + assert len(results) == 4 + assert all("no-op" in r.message.lower() for r in results) + + # recreate all objects + results = run_dbt([*clone_args, "--full-refresh"]) + assert len(results) == 4 + + # select only models this time + results = run_dbt([*clone_args, "--resource-type", "model"]) + assert len(results) == 2 + assert all("no-op" in r.message.lower() for r in results) + + +class TestPostgresCloneNotPossible(BaseCloneNotPossible): + @pytest.fixture(autouse=True) + def clean_up(self, project): + yield + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=f"{project.test_schema}_seeds" + ) + project.adapter.drop_schema(relation) + + relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.adapter.drop_schema(relation) + + pass + + +class TestPostgresClonePossible(BaseClonePossible): + @pytest.fixture(autouse=True) + def clean_up(self, project): + yield + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=f"{project.test_schema}_seeds" + ) + project.adapter.drop_schema(relation) + + relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.adapter.drop_schema(relation) + + pass diff --git a/tests/functional/adapter/test_query_comment.py b/tests/functional/adapter/test_query_comment.py index 8bf02bfe..b8390990 100644 --- a/tests/functional/adapter/test_query_comment.py +++ b/tests/functional/adapter/test_query_comment.py @@ -1,32 +1,158 @@ -from dbt.tests.adapter.query_comment.test_query_comment import ( - BaseEmptyQueryComments, - BaseMacroArgsQueryComments, - BaseMacroInvalidQueryComments, - BaseMacroQueryComments, - BaseNullQueryComments, - BaseQueryComments, -) +import json +import pytest +from dbt.exceptions import DbtRuntimeError +from dbt.tests.util import run_dbt_and_capture +from dbt.version import __version__ as dbt_version -class TestQueryCommentsFabric(BaseQueryComments): +MACROS__MACRO_SQL = """ +{%- macro query_header_no_args() -%} +{%- set x = "are pretty cool" -%} +{{ "dbt macros" }} +{{ x }} +{%- endmacro -%} + + +{%- macro query_header_args(message) -%} + {%- set comment_dict = dict( + app='dbt++', + macro_version='0.1.0', + dbt_version=dbt_version, + message='blah: '~ message) -%} + {{ return(comment_dict) }} +{%- endmacro -%} + + +{%- macro ordered_to_json(dct) -%} +{{ tojson(dct, sort_keys=True) }} +{%- endmacro %} + + +{% macro invalid_query_header() -%} +{{ "Here is an invalid character for you: */" }} +{% endmacro %} + +""" + +MODELS__X_SQL = """ +{% do run_query('select 2 as inner_id') %} +select 1 as outer_id +""" + + +class BaseDefaultQueryComments: + @pytest.fixture(scope="class") + def models(self): + return { + "x.sql": MODELS__X_SQL, + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "macro.sql": MACROS__MACRO_SQL, + } + + def run_get_json(self, expect_pass=True): + res, raw_logs = run_dbt_and_capture( + ["--debug", "--log-format=json", "run"], expect_pass=expect_pass + ) + + # empty lists evaluate as False + assert len(res) > 0 + return raw_logs + + +# Base setup to be inherited # +class BaseQueryComments(BaseDefaultQueryComments): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"query-comment": "dbt\nrules!\n"} + + def test_matches_comment(self, project) -> bool: + logs = self.run_get_json() + assert r"/* dbt\nrules! */\n" in logs + + +class BaseMacroQueryComments(BaseDefaultQueryComments): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"query-comment": "{{ query_header_no_args() }}"} + + def test_matches_comment(self, project) -> bool: + logs = self.run_get_json() + assert r"/* dbt macros\nare pretty cool */\n" in logs + + +class BaseMacroArgsQueryComments(BaseDefaultQueryComments): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"query-comment": "{{ return(ordered_to_json(query_header_args(target.name))) }}"} + + def test_matches_comment(self, project) -> bool: + logs = self.run_get_json() + expected_dct = { + "app": "dbt++", + "dbt_version": dbt_version, + "macro_version": "0.1.0", + "message": f"blah: {project.adapter.config.target_name}", + } + expected = r"/* {} */\n".format(json.dumps(expected_dct, sort_keys=True)).replace( + '"', r"\"" + ) + assert expected in logs + + +class BaseMacroInvalidQueryComments(BaseDefaultQueryComments): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"query-comment": "{{ invalid_query_header() }}"} + + def test_run_assert_comments(self, project): + with pytest.raises(DbtRuntimeError): + self.run_get_json(expect_pass=False) + + +class BaseNullQueryComments(BaseDefaultQueryComments): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"query-comment": None} + + def test_matches_comment(self, project) -> bool: + logs = self.run_get_json() + assert "/*" not in logs or "*/" not in logs + + +class BaseEmptyQueryComments(BaseDefaultQueryComments): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"query-comment": ""} + + def test_matches_comment(self, project) -> bool: + logs = self.run_get_json() + assert "/*" not in logs or "*/" not in logs + + +# Tests # +class TestQueryComments(BaseQueryComments): pass -class TestMacroQueryCommentsFabric(BaseMacroQueryComments): +class TestMacroQueryComments(BaseMacroQueryComments): pass -class TestMacroArgsQueryCommentsFabric(BaseMacroArgsQueryComments): +class TestMacroArgsQueryComments(BaseMacroArgsQueryComments): pass -class TestMacroInvalidQueryCommentsFabric(BaseMacroInvalidQueryComments): +class TestMacroInvalidQueryComments(BaseMacroInvalidQueryComments): pass -class TestNullQueryCommentsFabric(BaseNullQueryComments): +class TestNullQueryComments(BaseNullQueryComments): pass -class TestEmptyQueryCommentsFabric(BaseEmptyQueryComments): +class TestEmptyQueryComments(BaseEmptyQueryComments): pass