From 004ab7a5e75e97b866f3bffdf14ce317b0dda437 Mon Sep 17 00:00:00 2001 From: Pradeep Srikakolapu Date: Thu, 16 Nov 2023 10:30:13 -0800 Subject: [PATCH 1/4] Clone Related changes --- .../fabric/macros/adapters/relation.sql | 8 +- .../materializations/models/table/clone.sql | 72 ++++++ tests/functional/adapter/test_dbt_clone.py | 236 ++++++++++++++++++ .../functional/adapter/test_query_comment.py | 154 ++++++++++-- 4 files changed, 452 insertions(+), 18 deletions(-) create mode 100644 dbt/include/fabric/macros/materializations/models/table/clone.sql create mode 100644 tests/functional/adapter/test_dbt_clone.py diff --git a/dbt/include/fabric/macros/adapters/relation.sql b/dbt/include/fabric/macros/adapters/relation.sql index 09c2533d..cb21dd62 100644 --- a/dbt/include/fabric/macros/adapters/relation.sql +++ b/dbt/include/fabric/macros/adapters/relation.sql @@ -38,10 +38,10 @@ type="view", path={"schema": reference[0], "identifier": reference[1]})) }} {% endfor %} - {% elif relation.type == 'table'%} - {%- else -%} - {{ exceptions.raise_not_implemented('Invalid relation being dropped: ' ~ relation) }} - {% endif %} + {% elif relation.type == 'table'%} + {%- else -%} + {{ exceptions.raise_not_implemented('Invalid relation being dropped: ' ~ relation) }} + {% endif %} {{ use_database_hint() }} EXEC('DROP {{ relation.type }} IF EXISTS {{ relation.include(database=False) }};'); diff --git a/dbt/include/fabric/macros/materializations/models/table/clone.sql b/dbt/include/fabric/macros/materializations/models/table/clone.sql new file mode 100644 index 00000000..c7e5b609 --- /dev/null +++ b/dbt/include/fabric/macros/materializations/models/table/clone.sql @@ -0,0 +1,72 @@ +{% macro fabric__can_clone_table() %} + {{ return(True) }} +{% endmacro %} + +{% macro fabric__create_or_replace_clone(this_relation, defer_relation) %} + CREATE TABLE {{this_relation}} + AS CLONE OF {{defer_relation}} +{% endmacro %} + +{%- materialization clone, adapter='fabric' -%} + + {%- set relations = {'relations': []} -%} + + {%- if not defer_relation -%} + -- nothing to do + {{ log("No relation found in state manifest for " ~ model.unique_id, info=True) }} + {{ return(relations) }} + {%- endif -%} + + {%- set existing_relation = load_cached_relation(this) -%} + {{ log("existing relation is "~existing_relation, info=True) }} + {{ log("defer relation is "~defer_relation, info=True) }} + + {%- if existing_relation and not flags.FULL_REFRESH -%} + -- noop! + {{ log("Relation " ~ existing_relation ~ " already exists", info=True) }} + {{ return(relations) }} + {%- endif -%} + + {%- set other_existing_relation = load_cached_relation(defer_relation) -%} + {{ log("other existing relation is "~other_existing_relation, info=True) }} + -- If this is a database that can do zero-copy cloning of tables, and the other relation is a table, then this will be a table + -- Otherwise, this will be a view + + {% set can_clone_table = can_clone_table() %} + + {%- if other_existing_relation and other_existing_relation.type == 'table' and can_clone_table -%} + + {%- set target_relation = this.incorporate(type='table') -%} + {% if existing_relation is not none and not existing_relation.is_table %} + {{ log("Dropping relation " ~ existing_relation ~ " because it is of type " ~ existing_relation.type) }} + {{ fabric__drop_relation_script(existing_relation) }} + {% endif %} + + -- as a general rule, data platforms that can clone tables can also do atomic 'create or replace' + {% call statement('main') %} + {{ create_or_replace_clone(target_relation, defer_relation) }} + {% endcall %} + + {# {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + {% do persist_docs(target_relation, model) %} #} + + {{ return({'relations': [target_relation]}) }} + + {%- else -%} + + {%- set target_relation = this.incorporate(type='view') -%} + + -- reuse the view materialization + -- TODO: support actual dispatch for materialization macros + -- Tracking ticket: https://github.com/dbt-labs/dbt-core/issues/7799 + {% set search_name = "materialization_view_" ~ adapter.type() %} + {% if not search_name in context %} + {% set search_name = "materialization_view_default" %} + {% endif %} + {% set materialization_macro = context[search_name] %} + {% set relations = materialization_macro() %} + {{ return(relations) }} + {% endif %} + +{%- endmaterialization -%} diff --git a/tests/functional/adapter/test_dbt_clone.py b/tests/functional/adapter/test_dbt_clone.py new file mode 100644 index 00000000..87e0c5ac --- /dev/null +++ b/tests/functional/adapter/test_dbt_clone.py @@ -0,0 +1,236 @@ +import os +import shutil +from collections import Counter +from copy import deepcopy + +import pytest +from dbt.exceptions import DbtRuntimeError +from dbt.tests.adapter.dbt_clone.fixtures import ( + custom_can_clone_tables_false_macros_sql, + ephemeral_model_sql, + exposures_yml, + get_schema_name_sql, + infinite_macros_sql, + macros_sql, + schema_yml, + seed_csv, + snapshot_sql, + table_model_sql, + view_model_sql, +) +from dbt.tests.util import run_dbt + + +class BaseClone: + @pytest.fixture(scope="class") + def models(self): + return { + "table_model.sql": table_model_sql, + "view_model.sql": view_model_sql, + "ephemeral_model.sql": ephemeral_model_sql, + "schema.yml": schema_yml, + "exposures.yml": exposures_yml, + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "macros.sql": macros_sql, + "infinite_macros.sql": infinite_macros_sql, + "get_schema_name.sql": get_schema_name_sql, + } + + @pytest.fixture(scope="class") + def seeds(self): + return { + "seed.csv": seed_csv, + } + + @pytest.fixture(scope="class") + def snapshots(self): + return { + "snapshot.sql": snapshot_sql, + } + + @pytest.fixture(scope="class") + def other_schema(self, unique_schema): + return unique_schema + "_other" + + @property + def project_config_update(self): + return { + "seeds": { + "test": { + "quote_columns": False, + } + } + } + + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target, unique_schema, other_schema): + outputs = {"default": dbt_profile_target, "otherschema": deepcopy(dbt_profile_target)} + outputs["default"]["schema"] = unique_schema + outputs["otherschema"]["schema"] = other_schema + return {"test": {"outputs": outputs, "target": "default"}} + + def copy_state(self, project_root): + state_path = os.path.join(project_root, "state") + if not os.path.exists(state_path): + os.makedirs(state_path) + shutil.copyfile( + f"{project_root}/target/manifest.json", f"{project_root}/state/manifest.json" + ) + + def run_and_save_state(self, project_root, with_snapshot=False): + results = run_dbt(["seed"]) + assert len(results) == 1 + assert not any(r.node.deferred for r in results) + results = run_dbt(["run"]) + assert len(results) == 2 + assert not any(r.node.deferred for r in results) + results = run_dbt(["test"]) + assert len(results) == 2 + + if with_snapshot: + results = run_dbt(["snapshot"]) + assert len(results) == 1 + assert not any(r.node.deferred for r in results) + + # copy files + self.copy_state(project_root) + + +# -- Below we define base classes for tests you import the one based on if your adapter uses dbt clone or not -- +class BaseClonePossible(BaseClone): + def test_can_clone_true(self, project, unique_schema, other_schema): + project.create_test_schema(other_schema) + self.run_and_save_state(project.project_root, with_snapshot=True) + + clone_args = [ + "clone", + "--state", + "state", + "--target", + "otherschema", + ] + + results = run_dbt(clone_args) + assert len(results) == 4 + + schema_relations = project.adapter.list_relations( + database=project.database, schema=other_schema + ) + types = [r.type for r in schema_relations] + count_types = Counter(types) + assert count_types == Counter({"table": 3, "view": 1}) + + # objects already exist, so this is a no-op + results = run_dbt(clone_args) + assert len(results) == 4 + assert all("no-op" in r.message.lower() for r in results) + + # recreate all objects + results = run_dbt([*clone_args, "--full-refresh"]) + assert len(results) == 4 + + # select only models this time + results = run_dbt([*clone_args, "--resource-type", "model"]) + assert len(results) == 2 + assert all("no-op" in r.message.lower() for r in results) + + def test_clone_no_state(self, project, unique_schema, other_schema): + project.create_test_schema(other_schema) + self.run_and_save_state(project.project_root, with_snapshot=True) + + clone_args = [ + "clone", + "--target", + "otherschema", + ] + + with pytest.raises( + DbtRuntimeError, + match="--state or --defer-state are required for deferral, but neither was provided", + ): + run_dbt(clone_args) + + +class BaseCloneNotPossible(BaseClone): + @pytest.fixture(scope="class") + def macros(self): + return { + "macros.sql": macros_sql, + "my_can_clone_tables.sql": custom_can_clone_tables_false_macros_sql, + "infinite_macros.sql": infinite_macros_sql, + "get_schema_name.sql": get_schema_name_sql, + } + + def test_can_clone_false(self, project, unique_schema, other_schema): + project.create_test_schema(other_schema) + self.run_and_save_state(project.project_root, with_snapshot=True) + + clone_args = [ + "clone", + "--state", + "state", + "--target", + "otherschema", + ] + + results = run_dbt(clone_args) + assert len(results) == 4 + + schema_relations = project.adapter.list_relations( + database=project.database, schema=other_schema + ) + assert all(r.type == "view" for r in schema_relations) + + # objects already exist, so this is a no-op + results = run_dbt(clone_args) + assert len(results) == 4 + assert all("no-op" in r.message.lower() for r in results) + + # recreate all objects + results = run_dbt([*clone_args, "--full-refresh"]) + assert len(results) == 4 + + # select only models this time + results = run_dbt([*clone_args, "--resource-type", "model"]) + assert len(results) == 2 + assert all("no-op" in r.message.lower() for r in results) + + +class TestPostgresCloneNotPossible(BaseCloneNotPossible): + @pytest.fixture(autouse=True) + def clean_up(self, project): + yield + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=f"{project.test_schema}_seeds" + ) + project.adapter.drop_schema(relation) + + relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.adapter.drop_schema(relation) + + pass + + +class TestPostgresClonePossible(BaseClonePossible): + @pytest.fixture(autouse=True) + def clean_up(self, project): + yield + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=f"{project.test_schema}_seeds" + ) + project.adapter.drop_schema(relation) + + relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.adapter.drop_schema(relation) + + pass diff --git a/tests/functional/adapter/test_query_comment.py b/tests/functional/adapter/test_query_comment.py index 8bf02bfe..b8390990 100644 --- a/tests/functional/adapter/test_query_comment.py +++ b/tests/functional/adapter/test_query_comment.py @@ -1,32 +1,158 @@ -from dbt.tests.adapter.query_comment.test_query_comment import ( - BaseEmptyQueryComments, - BaseMacroArgsQueryComments, - BaseMacroInvalidQueryComments, - BaseMacroQueryComments, - BaseNullQueryComments, - BaseQueryComments, -) +import json +import pytest +from dbt.exceptions import DbtRuntimeError +from dbt.tests.util import run_dbt_and_capture +from dbt.version import __version__ as dbt_version -class TestQueryCommentsFabric(BaseQueryComments): +MACROS__MACRO_SQL = """ +{%- macro query_header_no_args() -%} +{%- set x = "are pretty cool" -%} +{{ "dbt macros" }} +{{ x }} +{%- endmacro -%} + + +{%- macro query_header_args(message) -%} + {%- set comment_dict = dict( + app='dbt++', + macro_version='0.1.0', + dbt_version=dbt_version, + message='blah: '~ message) -%} + {{ return(comment_dict) }} +{%- endmacro -%} + + +{%- macro ordered_to_json(dct) -%} +{{ tojson(dct, sort_keys=True) }} +{%- endmacro %} + + +{% macro invalid_query_header() -%} +{{ "Here is an invalid character for you: */" }} +{% endmacro %} + +""" + +MODELS__X_SQL = """ +{% do run_query('select 2 as inner_id') %} +select 1 as outer_id +""" + + +class BaseDefaultQueryComments: + @pytest.fixture(scope="class") + def models(self): + return { + "x.sql": MODELS__X_SQL, + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "macro.sql": MACROS__MACRO_SQL, + } + + def run_get_json(self, expect_pass=True): + res, raw_logs = run_dbt_and_capture( + ["--debug", "--log-format=json", "run"], expect_pass=expect_pass + ) + + # empty lists evaluate as False + assert len(res) > 0 + return raw_logs + + +# Base setup to be inherited # +class BaseQueryComments(BaseDefaultQueryComments): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"query-comment": "dbt\nrules!\n"} + + def test_matches_comment(self, project) -> bool: + logs = self.run_get_json() + assert r"/* dbt\nrules! */\n" in logs + + +class BaseMacroQueryComments(BaseDefaultQueryComments): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"query-comment": "{{ query_header_no_args() }}"} + + def test_matches_comment(self, project) -> bool: + logs = self.run_get_json() + assert r"/* dbt macros\nare pretty cool */\n" in logs + + +class BaseMacroArgsQueryComments(BaseDefaultQueryComments): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"query-comment": "{{ return(ordered_to_json(query_header_args(target.name))) }}"} + + def test_matches_comment(self, project) -> bool: + logs = self.run_get_json() + expected_dct = { + "app": "dbt++", + "dbt_version": dbt_version, + "macro_version": "0.1.0", + "message": f"blah: {project.adapter.config.target_name}", + } + expected = r"/* {} */\n".format(json.dumps(expected_dct, sort_keys=True)).replace( + '"', r"\"" + ) + assert expected in logs + + +class BaseMacroInvalidQueryComments(BaseDefaultQueryComments): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"query-comment": "{{ invalid_query_header() }}"} + + def test_run_assert_comments(self, project): + with pytest.raises(DbtRuntimeError): + self.run_get_json(expect_pass=False) + + +class BaseNullQueryComments(BaseDefaultQueryComments): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"query-comment": None} + + def test_matches_comment(self, project) -> bool: + logs = self.run_get_json() + assert "/*" not in logs or "*/" not in logs + + +class BaseEmptyQueryComments(BaseDefaultQueryComments): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"query-comment": ""} + + def test_matches_comment(self, project) -> bool: + logs = self.run_get_json() + assert "/*" not in logs or "*/" not in logs + + +# Tests # +class TestQueryComments(BaseQueryComments): pass -class TestMacroQueryCommentsFabric(BaseMacroQueryComments): +class TestMacroQueryComments(BaseMacroQueryComments): pass -class TestMacroArgsQueryCommentsFabric(BaseMacroArgsQueryComments): +class TestMacroArgsQueryComments(BaseMacroArgsQueryComments): pass -class TestMacroInvalidQueryCommentsFabric(BaseMacroInvalidQueryComments): +class TestMacroInvalidQueryComments(BaseMacroInvalidQueryComments): pass -class TestNullQueryCommentsFabric(BaseNullQueryComments): +class TestNullQueryComments(BaseNullQueryComments): pass -class TestEmptyQueryCommentsFabric(BaseEmptyQueryComments): +class TestEmptyQueryComments(BaseEmptyQueryComments): pass From cf3688073d41ee70463f0717c892c3cc27a08abe Mon Sep 17 00:00:00 2001 From: Pradeep Srikakolapu Date: Wed, 22 Nov 2023 09:23:47 -0800 Subject: [PATCH 2/4] v1.6.1 release --- CHANGELOG.md | 16 +++ dbt/adapters/fabric/__version__.py | 2 +- .../fabric/fabric_connection_manager.py | 56 +-------- dbt/adapters/fabric/fabric_credentials.py | 4 +- .../fabric/macros/adapters/metadata.sql | 25 ++-- .../fabric/macros/adapters/relation.sql | 112 ++---------------- dbt/include/fabric/macros/adapters/schema.sql | 4 +- .../materializations/models/table/clone.sql | 96 ++++++--------- .../models/table/create_table_as.sql | 10 +- .../models/view/create_view_as.sql | 3 +- .../materializations/snapshots/snapshot.sql | 10 +- tests/conftest.py | 1 - tests/functional/adapter/test_constraints.py | 8 +- tests/functional/adapter/test_dbt_clone.py | 12 +- .../functional/adapter/test_query_comment.py | 4 +- 15 files changed, 100 insertions(+), 263 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 33e708e7..3b3b4404 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,21 @@ # Changelog +### v1.6.1 + +## Features + +* Fabric DW now supports sp_rename. Starting v1.6.1 sp_rename is metadata operation +* Enabled table clone feature + +## Enhancements + +* Addressed [Issue 53](https://github.com/microsoft/dbt-fabric/issues/53) +* Added explicit support for [Issue 76 - ActiveDirectoryServicePrincipal authentication](https://github.com/microsoft/dbt-fabric/issues/74) +* Removed port number support in connection string as it is no longer required in Microsoft Fabric DW +* Removed MSI authentication as it does not make sense for Microsoft Fabric. +* Table lock hints are not supported by Fabric DW +* Supported authentication modes are ActiveDirectory* and AZ CLI + ### v1.6.0 ## Features diff --git a/dbt/adapters/fabric/__version__.py b/dbt/adapters/fabric/__version__.py index 38ec8ede..cead7e89 100644 --- a/dbt/adapters/fabric/__version__.py +++ b/dbt/adapters/fabric/__version__.py @@ -1 +1 @@ -version = "1.6.0" +version = "1.6.1" diff --git a/dbt/adapters/fabric/fabric_connection_manager.py b/dbt/adapters/fabric/fabric_connection_manager.py index d045e874..6b1349e5 100644 --- a/dbt/adapters/fabric/fabric_connection_manager.py +++ b/dbt/adapters/fabric/fabric_connection_manager.py @@ -9,13 +9,7 @@ import dbt.exceptions import pyodbc from azure.core.credentials import AccessToken -from azure.identity import ( - AzureCliCredential, - ClientSecretCredential, - DefaultAzureCredential, - EnvironmentCredential, - ManagedIdentityCredential, -) +from azure.identity import AzureCliCredential, DefaultAzureCredential, EnvironmentCredential from dbt.adapters.sql import SQLConnectionManager from dbt.clients.agate_helper import empty_table from dbt.contracts.connection import AdapterResponse, Connection, ConnectionState @@ -113,24 +107,6 @@ def get_cli_access_token(credentials: FabricCredentials) -> AccessToken: return token -def get_msi_access_token(credentials: FabricCredentials) -> AccessToken: - """ - Get an Azure access token from the system's managed identity - - Parameters - ----------- - credentials: FabricCredentials - Credentials. - - Returns - ------- - out : AccessToken - The access token. - """ - token = ManagedIdentityCredential().get_token(AZURE_CREDENTIAL_SCOPE) - return token - - def get_auto_access_token(credentials: FabricCredentials) -> AccessToken: """ Get an Azure access token automatically through azure-identity @@ -167,30 +143,8 @@ def get_environment_access_token(credentials: FabricCredentials) -> AccessToken: return token -def get_sp_access_token(credentials: FabricCredentials) -> AccessToken: - """ - Get an Azure access token using the SP credentials. - - Parameters - ---------- - credentials : FabricCredentials - Credentials. - - Returns - ------- - out : AccessToken - The access token. - """ - token = ClientSecretCredential( - str(credentials.tenant_id), str(credentials.client_id), str(credentials.client_secret) - ).get_token(AZURE_CREDENTIAL_SCOPE) - return token - - AZURE_AUTH_FUNCTIONS: Mapping[str, AZURE_AUTH_FUNCTION_TYPE] = { - "serviceprincipal": get_sp_access_token, "cli": get_cli_access_token, - "msi": get_msi_access_token, "auto": get_auto_access_token, "environment": get_environment_access_token, } @@ -335,7 +289,7 @@ def open(cls, connection: Connection) -> Connection: # SQL Server named instance. In this case then port number has to be omitted. con_str.append(f"SERVER={credentials.host}") else: - con_str.append(f"SERVER={credentials.host},{credentials.port}") + con_str.append(f"SERVER={credentials.host}") con_str.append(f"Database={credentials.database}") @@ -347,14 +301,16 @@ def open(cls, connection: Connection) -> Connection: if credentials.authentication == "ActiveDirectoryPassword": con_str.append(f"UID={{{credentials.UID}}}") con_str.append(f"PWD={{{credentials.PWD}}}") + if credentials.authentication == "ActiveDirectoryServicePrincipal": + con_str.append(f"UID={{{credentials.client_id}}}") + con_str.append(f"PWD={{{credentials.client_secret}}}") elif credentials.authentication == "ActiveDirectoryInteractive": con_str.append(f"UID={{{credentials.UID}}}") elif credentials.windows_login: con_str.append("trusted_connection=Yes") elif credentials.authentication == "sql": - con_str.append(f"UID={{{credentials.UID}}}") - con_str.append(f"PWD={{{credentials.PWD}}}") + raise pyodbc.DatabaseError("SQL Authentication is not supported by Microsoft Fabric") # https://docs.microsoft.com/en-us/sql/relational-databases/native-client/features/using-encryption-without-validation?view=sql-server-ver15 assert credentials.encrypt is not None diff --git a/dbt/adapters/fabric/fabric_credentials.py b/dbt/adapters/fabric/fabric_credentials.py index 7e3a79e3..60da8b4f 100644 --- a/dbt/adapters/fabric/fabric_credentials.py +++ b/dbt/adapters/fabric/fabric_credentials.py @@ -10,14 +10,13 @@ class FabricCredentials(Credentials): host: str database: str schema: str - port: Optional[int] = 1433 UID: Optional[str] = None PWD: Optional[str] = None windows_login: Optional[bool] = False tenant_id: Optional[str] = None client_id: Optional[str] = None client_secret: Optional[str] = None - authentication: Optional[str] = "sql" + authentication: Optional[str] = "ActiveDirectoryServicePrincipal" encrypt: Optional[bool] = True # default value in MS ODBC Driver 18 as well trust_cert: Optional[bool] = False # default value in MS ODBC Driver 18 as well retries: int = 1 @@ -53,7 +52,6 @@ def _connection_keys(self): "server", "database", "schema", - "port", "UID", "client_id", "authentication", diff --git a/dbt/include/fabric/macros/adapters/metadata.sql b/dbt/include/fabric/macros/adapters/metadata.sql index ea4db857..47ceba34 100644 --- a/dbt/include/fabric/macros/adapters/metadata.sql +++ b/dbt/include/fabric/macros/adapters/metadata.sql @@ -1,12 +1,3 @@ -{% macro use_database_hint() %} - {{ return(adapter.dispatch('use_database_hint')()) }} -{% endmacro %} - -{% macro default__use_database_hint() %}{% endmacro %} -{% macro fabric__use_database_hint() %} - {# USE [{{ relation.database }}]; #} -{% endmacro %} - {% macro information_schema_hints() %} {{ return(adapter.dispatch('information_schema_hints')()) }} {% endmacro %} @@ -24,7 +15,7 @@ name as principal_name, principal_id as principal_id from - sys.database_principals + sys.database_principals {{ information_schema_hints() }} ), schemas as ( @@ -33,7 +24,7 @@ schema_id as schema_id, principal_id as principal_id from - sys.schemas + sys.schemas {{ information_schema_hints() }} ), tables as ( @@ -43,7 +34,7 @@ principal_id as principal_id, 'BASE TABLE' as table_type from - sys.tables + sys.tables {{ information_schema_hints() }} ), tables_with_metadata as ( @@ -64,7 +55,7 @@ principal_id as principal_id, 'VIEW' as table_type from - sys.views + sys.views {{ information_schema_hints() }} ), views_with_metadata as ( @@ -107,7 +98,7 @@ column_name, ordinal_position as column_index, data_type as column_type - from INFORMATION_SCHEMA.COLUMNS + from INFORMATION_SCHEMA.COLUMNS {{ information_schema_hints() }} ) @@ -138,9 +129,9 @@ {% macro fabric__list_schemas(database) %} {% call statement('list_schemas', fetch_result=True, auto_begin=False) -%} - {{ use_database_hint() }} + select name as [schema] - from sys.schemas + from sys.schemas {{ information_schema_hints() }} {% endcall %} {{ return(load_result('list_schemas').table) }} {% endmacro %} @@ -164,7 +155,7 @@ else table_type end as table_type - from [{{ schema_relation.database }}].INFORMATION_SCHEMA.TABLES + from [{{ schema_relation.database }}].INFORMATION_SCHEMA.TABLES {{ information_schema_hints() }} where table_schema like '{{ schema_relation.schema }}' {% endcall %} {{ return(load_result('list_relations_without_caching').table) }} diff --git a/dbt/include/fabric/macros/adapters/relation.sql b/dbt/include/fabric/macros/adapters/relation.sql index cb21dd62..585fb83b 100644 --- a/dbt/include/fabric/macros/adapters/relation.sql +++ b/dbt/include/fabric/macros/adapters/relation.sql @@ -16,7 +16,7 @@ {% if relation.type == 'view' -%} {% call statement('find_references', fetch_result=true) %} - {{ use_database_hint() }} + USE [{{ relation.database }}]; select sch.name as schema_name, obj.name as view_name @@ -39,117 +39,19 @@ path={"schema": reference[0], "identifier": reference[1]})) }} {% endfor %} {% elif relation.type == 'table'%} + {% set object_id_type = 'U' %} {%- else -%} {{ exceptions.raise_not_implemented('Invalid relation being dropped: ' ~ relation) }} {% endif %} - - {{ use_database_hint() }} + USE [{{ relation.database }}]; EXEC('DROP {{ relation.type }} IF EXISTS {{ relation.include(database=False) }};'); - {% endmacro %} {% macro fabric__rename_relation(from_relation, to_relation) -%} - {% if to_relation.type == 'view' %} - {% call statement('get_view_definition', fetch_result=True) %} - SELECT m.[definition] AS VIEW_DEFINITION - FROM sys.objects o - INNER JOIN sys.sql_modules m - ON m.[object_id] = o.[object_id] - INNER JOIN sys.views v - ON o.[object_id] = v.[object_id] - INNER JOIN sys.schemas s - ON o.schema_id = s.schema_id - AND s.schema_id = v.schema_id - WHERE s.name = '{{ from_relation.schema }}' - AND v.name = '{{ from_relation.identifier }}' - AND o.[type] = 'V'; - {% endcall %} - - {% set view_def_full = load_result('get_view_definition')['data'][0][0] %} - {# Jinja does not allow bitwise operators and we need re.I | re.M here. So calculated manually this becomes 10. #} - {% set final_view_sql = modules.re.sub("create\s+view\s+.*?\s+as\s+","",view_def_full, 10) %} - - {% call statement('create_new_view') %} - {{ create_view_as(to_relation, final_view_sql) }} - {% endcall %} - {% call statement('drop_old_view') %} - EXEC('DROP VIEW IF EXISTS {{ from_relation.include(database=False) }};'); - {% endcall %} - {% endif %} - {% if to_relation.type == 'table' %} - {% call statement('rename_relation') %} - EXEC('create table {{ to_relation.include(database=False) }} as select * from {{ from_relation.include(database=False) }}'); - {%- endcall %} - -- Getting constraints from the old table - {% call statement('get_table_constraints', fetch_result=True) %} - SELECT DISTINCT Contraint_statement FROM - ( - SELECT DISTINCT - CASE - WHEN tc.CONSTRAINT_TYPE = 'PRIMARY KEY' - THEN 'ALTER TABLE <> ADD CONSTRAINT PK_<>_'+ccu.COLUMN_NAME+' PRIMARY KEY NONCLUSTERED('+ccu.COLUMN_NAME+') NOT ENFORCED' - WHEN tc.CONSTRAINT_TYPE = 'UNIQUE' - THEN 'ALTER TABLE <> ADD CONSTRAINT UK_<>_'+ccu.COLUMN_NAME+' UNIQUE NONCLUSTERED('+ccu.COLUMN_NAME+') NOT ENFORCED' - END AS Contraint_statement - FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc INNER JOIN - INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu - ON tc.CONSTRAINT_NAME = ccu.CONSTRAINT_NAME - WHERE tc.TABLE_NAME = '{{ from_relation.identifier }}' and tc.TABLE_SCHEMA = '{{ from_relation.schema }}' - UNION ALL - SELECT - 'ALTER TABLE <> ADD CONSTRAINT FK_<>_'+CU.COLUMN_NAME+' FOREIGN KEY('+CU.COLUMN_NAME+') references '+PK.TABLE_SCHEMA+'.'+PK.TABLE_NAME+' ('+PT.COLUMN_NAME+') not enforced' AS Contraint_statement - FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS C - INNER JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS FK ON C.CONSTRAINT_NAME = FK.CONSTRAINT_NAME - INNER JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS PK ON C.UNIQUE_CONSTRAINT_NAME=PK.CONSTRAINT_NAME - INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE CU ON C.CONSTRAINT_NAME = CU.CONSTRAINT_NAME - INNER JOIN ( - SELECT i1.TABLE_NAME, i2.COLUMN_NAME, i1.TABLE_SCHEMA, i2.TABLE_SCHEMA AS CU_TableSchema - FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS i1 - INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE i2 ON i1.CONSTRAINT_NAME =i2.CONSTRAINT_NAME - WHERE i1.CONSTRAINT_TYPE = 'PRIMARY KEY' - ) PT ON PT.TABLE_NAME = PK.TABLE_NAME AND PT.TABLE_SCHEMA = PK.TABLE_SCHEMA AND PT.CU_TableSchema = PK.TABLE_SCHEMA - WHERE FK.TABLE_NAME = '{{ from_relation.identifier }}' and FK.TABLE_SCHEMA = '{{ from_relation.schema }}' - and PK.TABLE_SCHEMA = '{{ from_relation.schema }}' and PT.TABLE_SCHEMA = '{{ from_relation.schema }}' - ) T WHERE Contraint_statement IS NOT NULL - {% endcall %} - - {%call statement('drop_table_constraints', fetch_result= True)%} - SELECT drop_constraint_statement FROM - ( - SELECT 'ALTER TABLE ['+TABLE_SCHEMA+'].['+TABLE_NAME+'] DROP CONSTRAINT ' + CONSTRAINT_NAME AS drop_constraint_statement - FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS - WHERE TABLE_NAME = '{{ from_relation.identifier }}' and TABLE_SCHEMA = '{{ from_relation.schema }}' - ) T WHERE drop_constraint_statement IS NOT NULL - - {% endcall %} - - {% set references = load_result('get_table_constraints')['data'] %} - {% set drop_references = load_result('drop_table_constraints')['data'] %} - - {% for reference in drop_references -%} - {% set drop_constraint = reference[0]%} - - {% call statement('Drop_Constraints') %} - {{ log("Constraints to drop: "~reference[0], info=True) }} - EXEC('{{drop_constraint}}'); - {% endcall %} - {% endfor %} - - {% set targetTableNameConstraint = to_relation.include(database=False)%} - {% set targetTableNameConstraint = (targetTableNameConstraint|string).strip().replace("\"","").replace(".","_")%} - {% set targetTableName = to_relation.include(database=False) %} - - {% for reference in references -%} - {% set constraint_name = reference[0].replace("<>",targetTableNameConstraint)%} - {% set alter_create_table_constraint_script = constraint_name.replace("<>", (targetTableName|string).strip()) %} - {{ log("Constraints to create: "~alter_create_table_constraint_script, info=True) }} - {% call statement('Drop_Create_Constraints') %} - EXEC('{{alter_create_table_constraint_script}}'); - {% endcall %} - {% endfor %} - - {{ fabric__drop_relation(from_relation) }} - {% endif %} + {% call statement('rename_relation') -%} + USE [{{ from_relation.database }}]; + EXEC sp_rename '{{ from_relation.schema }}.{{ from_relation.identifier }}', '{{ to_relation.identifier }}' + {%- endcall %} {% endmacro %} -- DROP fabric__truncate_relation when TRUNCATE TABLE is supported diff --git a/dbt/include/fabric/macros/adapters/schema.sql b/dbt/include/fabric/macros/adapters/schema.sql index 298d585c..7fa10837 100644 --- a/dbt/include/fabric/macros/adapters/schema.sql +++ b/dbt/include/fabric/macros/adapters/schema.sql @@ -1,6 +1,6 @@ {% macro fabric__create_schema(relation) -%} {% call statement('create_schema') -%} - {{ use_database_hint() }} + USE [{{ relation.database }}]; IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{{ relation.schema }}') BEGIN EXEC('CREATE SCHEMA [{{ relation.schema }}]') @@ -10,7 +10,7 @@ {% macro fabric__create_schema_with_authorization(relation, schema_authorization) -%} {% call statement('create_schema') -%} - {{ use_database_hint() }} + USE [{{ relation.database }}]; IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{{ relation.schema }}') BEGIN EXEC('CREATE SCHEMA [{{ relation.schema }}] AUTHORIZATION [{{ schema_authorization }}]') diff --git a/dbt/include/fabric/macros/materializations/models/table/clone.sql b/dbt/include/fabric/macros/materializations/models/table/clone.sql index c7e5b609..110dd03f 100644 --- a/dbt/include/fabric/macros/materializations/models/table/clone.sql +++ b/dbt/include/fabric/macros/materializations/models/table/clone.sql @@ -2,71 +2,47 @@ {{ return(True) }} {% endmacro %} -{% macro fabric__create_or_replace_clone(this_relation, defer_relation) %} - CREATE TABLE {{this_relation}} +{% macro fabric__create_or_replace_clone(target_relation, defer_relation) %} + CREATE TABLE {{target_relation}} AS CLONE OF {{defer_relation}} {% endmacro %} -{%- materialization clone, adapter='fabric' -%} - - {%- set relations = {'relations': []} -%} - - {%- if not defer_relation -%} - -- nothing to do - {{ log("No relation found in state manifest for " ~ model.unique_id, info=True) }} - {{ return(relations) }} - {%- endif -%} - - {%- set existing_relation = load_cached_relation(this) -%} - {{ log("existing relation is "~existing_relation, info=True) }} - {{ log("defer relation is "~defer_relation, info=True) }} - - {%- if existing_relation and not flags.FULL_REFRESH -%} - -- noop! - {{ log("Relation " ~ existing_relation ~ " already exists", info=True) }} - {{ return(relations) }} - {%- endif -%} - - {%- set other_existing_relation = load_cached_relation(defer_relation) -%} - {{ log("other existing relation is "~other_existing_relation, info=True) }} - -- If this is a database that can do zero-copy cloning of tables, and the other relation is a table, then this will be a table - -- Otherwise, this will be a view - - {% set can_clone_table = can_clone_table() %} - {%- if other_existing_relation and other_existing_relation.type == 'table' and can_clone_table -%} - - {%- set target_relation = this.incorporate(type='table') -%} - {% if existing_relation is not none and not existing_relation.is_table %} - {{ log("Dropping relation " ~ existing_relation ~ " because it is of type " ~ existing_relation.type) }} - {{ fabric__drop_relation_script(existing_relation) }} - {% endif %} - - -- as a general rule, data platforms that can clone tables can also do atomic 'create or replace' - {% call statement('main') %} - {{ create_or_replace_clone(target_relation, defer_relation) }} - {% endcall %} - - {# {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} - {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} - {% do persist_docs(target_relation, model) %} #} - - {{ return({'relations': [target_relation]}) }} - - {%- else -%} +{%- materialization clone, adapter='fabric' -%} - {%- set target_relation = this.incorporate(type='view') -%} + {%- set relations = {'relations': []} -%} + {%- if not defer_relation -%} + -- nothing to do + {{ log("No relation found in state manifest for " ~ model.unique_id, info=True) }} + {{ return(relations) }} + {%- endif -%} + + {%- set other_existing_relation = load_cached_relation(defer_relation) -%} + {% set can_clone_table = can_clone_table() %} + + {%- if other_existing_relation and other_existing_relation.type == 'table' and can_clone_table -%} + {%- set target_relation = this.incorporate(type='table') -%} + + {% call statement('main') %} + {{ fabric__drop_relation_script(target_relation) }} + {{ create_or_replace_clone(target_relation, defer_relation) }} + {% endcall %} + {{ return({'relations': [target_relation]}) }} + {%- else -%} + + {%- set target_relation = this.incorporate(type='view') -%} + + -- reuse the view materialization + -- TODO: support actual dispatch for materialization macros + -- Tracking ticket: https://github.com/dbt-labs/dbt-core/issues/7799 + {% set search_name = "materialization_view_" ~ adapter.type() %} + {% if not search_name in context %} + {% set search_name = "materialization_view_default" %} + {% endif %} + {% set materialization_macro = context[search_name] %} + {% set relations = materialization_macro() %} + {{ return(relations) }} + {%- endif -%} - -- reuse the view materialization - -- TODO: support actual dispatch for materialization macros - -- Tracking ticket: https://github.com/dbt-labs/dbt-core/issues/7799 - {% set search_name = "materialization_view_" ~ adapter.type() %} - {% if not search_name in context %} - {% set search_name = "materialization_view_default" %} - {% endif %} - {% set materialization_macro = context[search_name] %} - {% set relations = materialization_macro() %} - {{ return(relations) }} - {% endif %} {%- endmaterialization -%} diff --git a/dbt/include/fabric/macros/materializations/models/table/create_table_as.sql b/dbt/include/fabric/macros/materializations/models/table/create_table_as.sql index e45c9a9f..4ac51460 100644 --- a/dbt/include/fabric/macros/materializations/models/table/create_table_as.sql +++ b/dbt/include/fabric/macros/materializations/models/table/create_table_as.sql @@ -11,8 +11,8 @@ {{ fabric__create_view_as(tmp_relation, sql) }} {% if contract_config.enforced %} - CREATE TABLE {{ relation.include(database=False) }} - {{ fabric__table_columns_and_constraints(relation.include(database=False)) }} + CREATE TABLE [{{relation.database}}].[{{relation.schema}}].[{{relation.identifier}}] + {{ fabric__table_columns_and_constraints(relation) }} {{ get_assert_columns_equivalent(sql) }} {% set listColumns %} @@ -21,11 +21,11 @@ {% endfor %} {%endset%} - INSERT INTO {{ relation.include(database=False) }} - ({{listColumns}}) SELECT {{listColumns}} FROM {{ tmp_relation.include(database=False) }}; + INSERT INTO [{{relation.database}}].[{{relation.schema}}].[{{relation.identifier}}] + ({{listColumns}}) SELECT {{listColumns}} FROM [{{tmp_relation.database}}].[{{tmp_relation.schema}}].[{{tmp_relation.identifier}}]; {%- else %} - EXEC('CREATE TABLE {{ relation.include(database=False) }} AS (SELECT * FROM {{ tmp_relation.include(database=False) }});'); + EXEC('CREATE TABLE [{{relation.database}}].[{{relation.schema}}].[{{relation.identifier}}] AS (SELECT * FROM [{{tmp_relation.database}}].[{{tmp_relation.schema}}].[{{tmp_relation.identifier}}]);'); {% endif %} {{ fabric__drop_relation_script(tmp_relation) }} diff --git a/dbt/include/fabric/macros/materializations/models/view/create_view_as.sql b/dbt/include/fabric/macros/materializations/models/view/create_view_as.sql index 7ae9cee6..0cfdd061 100644 --- a/dbt/include/fabric/macros/materializations/models/view/create_view_as.sql +++ b/dbt/include/fabric/macros/materializations/models/view/create_view_as.sql @@ -5,8 +5,7 @@ {% macro fabric__create_view_exec(relation, sql) -%} {%- set temp_view_sql = sql.replace("'", "''") -%} - {{ use_database_hint() }} - + USE [{{ relation.database }}]; {% set contract_config = config.get('contract') %} {% if contract_config.enforced %} {{ get_assert_columns_equivalent(sql) }} diff --git a/dbt/include/fabric/macros/materializations/snapshots/snapshot.sql b/dbt/include/fabric/macros/materializations/snapshots/snapshot.sql index a8719c86..3b5ddc73 100644 --- a/dbt/include/fabric/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/fabric/macros/materializations/snapshots/snapshot.sql @@ -15,12 +15,12 @@ {% endset %} {% set tempTableName %} - {{ relation.schema }}.{{ relation.identifier }}_{{ range(1300, 19000) | random }} + [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}_{{ range(1300, 19000) | random }}] {% endset %} {% set tempTable %} CREATE TABLE {{tempTableName}} - AS SELECT * {{columns}} FROM {{ relation.schema }}.{{ relation.identifier }} + AS SELECT * {{columns}} FROM [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}] {{ information_schema_hints() }} {% endset %} {% call statement('create_temp_table') -%} @@ -28,7 +28,7 @@ {%- endcall %} {% set dropTable %} - DROP TABLE {{ relation.schema }}.{{ relation.identifier }} + DROP TABLE [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}] {% endset %} {% call statement('drop_table') -%} @@ -36,8 +36,8 @@ {%- endcall %} {% set createTable %} - CREATE TABLE {{ relation.schema }}.{{ relation.identifier }} - AS SELECT * FROM {{tempTableName}} + CREATE TABLE {{ relation }} + AS SELECT * FROM {{tempTableName}} {{ information_schema_hints() }} {% endset %} {% call statement('create_Table') -%} diff --git a/tests/conftest.py b/tests/conftest.py index 828b13e2..1e3670cb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -43,7 +43,6 @@ def _all_profiles_base(): return { "type": "fabric", "driver": os.getenv("FABRIC_TEST_DRIVER", "ODBC Driver 18 for SQL Server"), - "port": int(os.getenv("FABRIC_TEST_PORT", "1433")), "retries": 2, } diff --git a/tests/functional/adapter/test_constraints.py b/tests/functional/adapter/test_constraints.py index cb38a76a..4a94c640 100644 --- a/tests/functional/adapter/test_constraints.py +++ b/tests/functional/adapter/test_constraints.py @@ -502,8 +502,8 @@ def test__constraints_ddl(self, project, expected_sql): generated_sql_generic = _find_and_replace( generated_sql_generic, "foreign_key_model", "" ) - - assert _normalize_whitespace(expected_sql) == _normalize_whitespace(generated_sql_generic) + generated_sql_wodb = generated_sql_generic.replace("USE [" + project.database + "];", "") + assert _normalize_whitespace(expected_sql) == _normalize_whitespace(generated_sql_wodb) class TestTableConstraintsRuntimeDdlEnforcement(BaseConstraintsRuntimeDdlEnforcement): @@ -563,8 +563,8 @@ def test__model_constraints_ddl(self, project, expected_sql): generated_sql_generic = _find_and_replace( generated_sql_generic, "foreign_key_model", "" ) - - assert _normalize_whitespace(expected_sql) == _normalize_whitespace(generated_sql_generic) + generated_sql_wodb = generated_sql_generic.replace("USE [" + project.database + "];", "") + assert _normalize_whitespace(expected_sql) == _normalize_whitespace(generated_sql_wodb) class TestModelConstraintsRuntimeEnforcement(BaseModelConstraintsRuntimeEnforcement): diff --git a/tests/functional/adapter/test_dbt_clone.py b/tests/functional/adapter/test_dbt_clone.py index 87e0c5ac..ba3d6064 100644 --- a/tests/functional/adapter/test_dbt_clone.py +++ b/tests/functional/adapter/test_dbt_clone.py @@ -127,7 +127,7 @@ def test_can_clone_true(self, project, unique_schema, other_schema): # objects already exist, so this is a no-op results = run_dbt(clone_args) assert len(results) == 4 - assert all("no-op" in r.message.lower() for r in results) + assert all("ok" in r.message.lower() for r in results) # recreate all objects results = run_dbt([*clone_args, "--full-refresh"]) @@ -136,7 +136,7 @@ def test_can_clone_true(self, project, unique_schema, other_schema): # select only models this time results = run_dbt([*clone_args, "--resource-type", "model"]) assert len(results) == 2 - assert all("no-op" in r.message.lower() for r in results) + assert all("ok" in r.message.lower() for r in results) def test_clone_no_state(self, project, unique_schema, other_schema): project.create_test_schema(other_schema) @@ -188,7 +188,7 @@ def test_can_clone_false(self, project, unique_schema, other_schema): # objects already exist, so this is a no-op results = run_dbt(clone_args) assert len(results) == 4 - assert all("no-op" in r.message.lower() for r in results) + assert all("ok" in r.message.lower() for r in results) # recreate all objects results = run_dbt([*clone_args, "--full-refresh"]) @@ -197,10 +197,10 @@ def test_can_clone_false(self, project, unique_schema, other_schema): # select only models this time results = run_dbt([*clone_args, "--resource-type", "model"]) assert len(results) == 2 - assert all("no-op" in r.message.lower() for r in results) + assert all("ok" in r.message.lower() for r in results) -class TestPostgresCloneNotPossible(BaseCloneNotPossible): +class TestFabricCloneNotPossible(BaseCloneNotPossible): @pytest.fixture(autouse=True) def clean_up(self, project): yield @@ -218,7 +218,7 @@ def clean_up(self, project): pass -class TestPostgresClonePossible(BaseClonePossible): +class TestFabricClonePossible(BaseClonePossible): @pytest.fixture(autouse=True) def clean_up(self, project): yield diff --git a/tests/functional/adapter/test_query_comment.py b/tests/functional/adapter/test_query_comment.py index b8390990..19605f04 100644 --- a/tests/functional/adapter/test_query_comment.py +++ b/tests/functional/adapter/test_query_comment.py @@ -67,11 +67,11 @@ def run_get_json(self, expect_pass=True): class BaseQueryComments(BaseDefaultQueryComments): @pytest.fixture(scope="class") def project_config_update(self): - return {"query-comment": "dbt\nrules!\n"} + return {"query-comment": "pradeep"} def test_matches_comment(self, project) -> bool: logs = self.run_get_json() - assert r"/* dbt\nrules! */\n" in logs + assert r"pradeep" in logs class BaseMacroQueryComments(BaseDefaultQueryComments): From c0949a52a7921efd89c0a939e4a3ff1ddf93ac79 Mon Sep 17 00:00:00 2001 From: Pradeep Srikakolapu Date: Mon, 27 Nov 2023 20:32:22 -0800 Subject: [PATCH 3/4] dbt-fabric 1.7.1 release commit --- .github/workflows/publish-docker.yml | 4 +- CHANGELOG.md | 18 ++ dbt/adapters/fabric/fabric_adapter.py | 7 + .../fabric/fabric_connection_manager.py | 32 ++- .../fabric/macros/adapters/catalog.sql | 260 ++++++++++++++++++ .../fabric/macros/adapters/metadata.sql | 144 ++-------- dbt/include/fabric/macros/adapters/show.sql | 14 + .../macros/materializations/tests/helpers.sql | 24 ++ .../macros/materializations/tests/test.sql | 48 ---- dev_requirements.txt | 6 +- setup.py | 2 +- tests/functional/adapter/test_dbt_show.py | 52 ++++ .../test_get_last_relation_modified.py | 59 ++++ .../test_list_relations_without_caching.py | 167 +++++++++++ .../functional/adapter/test_query_comment.py | 141 +--------- .../functional/adapter/test_relation_types.py | 60 ++++ .../adapter/test_store_test_failures.py | 200 ++++++++++++++ 17 files changed, 925 insertions(+), 313 deletions(-) create mode 100644 dbt/include/fabric/macros/adapters/catalog.sql create mode 100644 dbt/include/fabric/macros/adapters/show.sql delete mode 100644 dbt/include/fabric/macros/materializations/tests/test.sql create mode 100644 tests/functional/adapter/test_dbt_show.py create mode 100644 tests/functional/adapter/test_get_last_relation_modified.py create mode 100644 tests/functional/adapter/test_list_relations_without_caching.py create mode 100644 tests/functional/adapter/test_relation_types.py create mode 100644 tests/functional/adapter/test_store_test_failures.py diff --git a/.github/workflows/publish-docker.yml b/.github/workflows/publish-docker.yml index e332337f..18fd7fa5 100644 --- a/.github/workflows/publish-docker.yml +++ b/.github/workflows/publish-docker.yml @@ -24,14 +24,14 @@ jobs: uses: actions/checkout@v4 - name: Log in to the Container registry - uses: docker/login-action@v2.2.0 + uses: docker/login-action@v3.0.0 with: registry: ghcr.io username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - name: Build and push Docker image - uses: docker/build-push-action@v4.2.1 + uses: docker/build-push-action@v5.1.0 with: context: devops build-args: PYTHON_VERSION=${{ matrix.python_version }} diff --git a/CHANGELOG.md b/CHANGELOG.md index e265749c..93e079e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,23 @@ # Changelog +### v1.7.1 + +## Features + +* Added capability support structure in fabric adapter +* Added metadata freshness checks +* Updated catalog fetch performance improvements to handle relations with many pre-existing objects +* Added dbt-show support to 1.7.1 + +## Enhancements + +* improve connection manager logging +* Added metadata freshness checks tests +* Added capability support tests +* Added catalog fetch performance improvements +* Added dbt show's --limit flag tests +* Added storing test failures tests + ### v1.6.1 ## Features diff --git a/dbt/adapters/fabric/fabric_adapter.py b/dbt/adapters/fabric/fabric_adapter.py index eaf49bb1..c63c5fd5 100644 --- a/dbt/adapters/fabric/fabric_adapter.py +++ b/dbt/adapters/fabric/fabric_adapter.py @@ -9,6 +9,7 @@ from dbt.adapters.base.meta import available from dbt.adapters.base.relation import BaseRelation from dbt.adapters.cache import _make_ref_key_dict +from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support # from dbt.adapters.cache import _make_ref_key_msg from dbt.adapters.sql import SQLAdapter @@ -27,6 +28,12 @@ class FabricAdapter(SQLAdapter): Column = FabricColumn AdapterSpecificConfigs = FabricConfigs + _capabilities: CapabilityDict = CapabilityDict( + { + Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full), + Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full), + } + ) CONSTRAINT_SUPPORT = { ConstraintType.check: ConstraintSupport.NOT_SUPPORTED, ConstraintType.not_null: ConstraintSupport.ENFORCED, diff --git a/dbt/adapters/fabric/fabric_connection_manager.py b/dbt/adapters/fabric/fabric_connection_manager.py index 6b1349e5..e77e1dfa 100644 --- a/dbt/adapters/fabric/fabric_connection_manager.py +++ b/dbt/adapters/fabric/fabric_connection_manager.py @@ -14,6 +14,10 @@ from dbt.clients.agate_helper import empty_table from dbt.contracts.connection import AdapterResponse, Connection, ConnectionState from dbt.events import AdapterLogger +from dbt.events.contextvars import get_node_info +from dbt.events.functions import fire_event +from dbt.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus +from dbt.utils import cast_to_str from dbt.adapters.fabric import __version__ from dbt.adapters.fabric.fabric_credentials import FabricCredentials @@ -391,13 +395,26 @@ def add_query( if auto_begin and connection.transaction_open is False: self.begin() - logger.debug('Using {} connection "{}".'.format(self.TYPE, connection.name)) + fire_event( + ConnectionUsed( + conn_type=self.TYPE, + conn_name=cast_to_str(connection.name), + node_info=get_node_info(), + ) + ) with self.exception_handler(sql): if abridge_sql_log: - logger.debug("On {}: {}....".format(connection.name, sql[0:512])) + log_sql = "{}...".format(sql[:512]) else: - logger.debug("On {}: {}".format(connection.name, sql)) + log_sql = sql + + fire_event( + SQLQuery( + conn_name=cast_to_str(connection.name), sql=log_sql, node_info=get_node_info() + ) + ) + pre = time.time() cursor = connection.handle.cursor() @@ -416,9 +433,11 @@ def add_query( # https://github.com/mkleehammer/pyodbc/issues/134#issuecomment-281739794 connection.handle.add_output_converter(-155, byte_array_to_datetime) - logger.debug( - "SQL status: {} in {:0.2f} seconds".format( - self.get_response(cursor), (time.time() - pre) + fire_event( + SQLQueryStatus( + status=str(self.get_response(cursor)), + elapsed=round((time.time() - pre)), + node_info=get_node_info(), ) ) @@ -454,6 +473,7 @@ def data_type_code_to_name(cls, type_code: Union[str, str]) -> str: def execute( self, sql: str, auto_begin: bool = True, fetch: bool = False, limit: Optional[int] = None ) -> Tuple[AdapterResponse, agate.Table]: + sql = self._add_query_comment(sql) _, cursor = self.add_query(sql, auto_begin) response = self.get_response(cursor) if fetch: diff --git a/dbt/include/fabric/macros/adapters/catalog.sql b/dbt/include/fabric/macros/adapters/catalog.sql new file mode 100644 index 00000000..6d8d5c8d --- /dev/null +++ b/dbt/include/fabric/macros/adapters/catalog.sql @@ -0,0 +1,260 @@ +{% macro fabric__get_catalog(information_schemas, schemas) -%} + + {%- call statement('catalog', fetch_result=True) -%} + + with + principals as ( + select + name as principal_name, + principal_id as principal_id + from + sys.database_principals {{ information_schema_hints() }} + ), + + schemas as ( + select + name as schema_name, + schema_id as schema_id, + principal_id as principal_id + from + sys.schemas {{ information_schema_hints() }} + ), + + tables as ( + select + name as table_name, + schema_id as schema_id, + principal_id as principal_id, + 'BASE TABLE' as table_type + from + sys.tables {{ information_schema_hints() }} + ), + + tables_with_metadata as ( + select + table_name, + schema_name, + coalesce(tables.principal_id, schemas.principal_id) as owner_principal_id, + table_type + from + tables + join schemas on tables.schema_id = schemas.schema_id + ), + + views as ( + select + name as table_name, + schema_id as schema_id, + principal_id as principal_id, + 'VIEW' as table_type + from + sys.views {{ information_schema_hints() }} + ), + + views_with_metadata as ( + select + table_name, + schema_name, + coalesce(views.principal_id, schemas.principal_id) as owner_principal_id, + table_type + from + views + join schemas on views.schema_id = schemas.schema_id + ), + + tables_and_views as ( + select + table_name, + schema_name, + principal_name, + table_type + from + tables_with_metadata + join principals on tables_with_metadata.owner_principal_id = principals.principal_id + union all + select + table_name, + schema_name, + principal_name, + table_type + from + views_with_metadata + join principals on views_with_metadata.owner_principal_id = principals.principal_id + ), + + cols as ( + + select + table_catalog as table_database, + table_schema, + table_name, + column_name, + ordinal_position as column_index, + data_type as column_type + from INFORMATION_SCHEMA.COLUMNS {{ information_schema_hints() }} + + ) + + select + cols.table_database, + tv.schema_name as table_schema, + tv.table_name, + tv.table_type, + null as table_comment, + tv.principal_name as table_owner, + cols.column_name, + cols.column_index, + cols.column_type, + null as column_comment + from tables_and_views tv + join cols on tv.schema_name = cols.table_schema and tv.table_name = cols.table_name + where ({%- for schema in schemas -%} + upper(tv.schema_name) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%} + {%- endfor -%}) + + order by column_index + + {%- endcall -%} + + {{ return(load_result('catalog').table) }} + +{%- endmacro %} + +{% macro fabric__get_catalog_relations(information_schema, relations) -%} + + {%- call statement('catalog', fetch_result=True) -%} + + with + principals as ( + select + name as principal_name, + principal_id as principal_id + from + sys.database_principals {{ information_schema_hints() }} + ), + + schemas as ( + select + name as schema_name, + schema_id as schema_id, + principal_id as principal_id + from + sys.schemas {{ information_schema_hints() }} + ), + + tables as ( + select + name as table_name, + schema_id as schema_id, + principal_id as principal_id, + 'BASE TABLE' as table_type + from + sys.tables {{ information_schema_hints() }} + ), + + tables_with_metadata as ( + select + table_name, + schema_name, + coalesce(tables.principal_id, schemas.principal_id) as owner_principal_id, + table_type + from + tables + join schemas on tables.schema_id = schemas.schema_id + ), + + views as ( + select + name as table_name, + schema_id as schema_id, + principal_id as principal_id, + 'VIEW' as table_type + from + sys.views {{ information_schema_hints() }} + ), + + views_with_metadata as ( + select + table_name, + schema_name, + coalesce(views.principal_id, schemas.principal_id) as owner_principal_id, + table_type + from + views + join schemas on views.schema_id = schemas.schema_id + ), + + tables_and_views as ( + select + table_name, + schema_name, + principal_name, + table_type + from + tables_with_metadata + join principals on tables_with_metadata.owner_principal_id = principals.principal_id + union all + select + table_name, + schema_name, + principal_name, + table_type + from + views_with_metadata + join principals on views_with_metadata.owner_principal_id = principals.principal_id + ), + + cols as ( + + select + table_catalog as table_database, + table_schema, + table_name, + column_name, + ordinal_position as column_index, + data_type as column_type + from INFORMATION_SCHEMA.COLUMNS {{ information_schema_hints() }} + + ) + + select + cols.table_database, + tv.schema_name as table_schema, + tv.table_name, + tv.table_type, + null as table_comment, + tv.principal_name as table_owner, + cols.column_name, + cols.column_index, + cols.column_type, + null as column_comment + from tables_and_views tv + join cols on tv.schema_name = cols.table_schema and tv.table_name = cols.table_name + where ( + {%- for relation in relations -%} + {% if relation.schema and relation.identifier %} + ( + upper(tv.schema_name) = upper('{{ relation.schema }}') + and upper(tv.table_name) = upper('{{ relation.identifier }}') + ) + {% elif relation.schema %} + ( + upper(tv.schema_name) = upper('{{ relation.schema }}') + ) + {% else %} + {% do exceptions.raise_compiler_error( + '`get_catalog_relations` requires a list of relations, each with a schema' + ) %} + {% endif %} + + {%- if not loop.last %} or {% endif -%} + {%- endfor -%} + ) + + order by column_index + + {%- endcall -%} + + {{ return(load_result('catalog').table) }} + +{%- endmacro %} diff --git a/dbt/include/fabric/macros/adapters/metadata.sql b/dbt/include/fabric/macros/adapters/metadata.sql index 47ceba34..ed3a2aa3 100644 --- a/dbt/include/fabric/macros/adapters/metadata.sql +++ b/dbt/include/fabric/macros/adapters/metadata.sql @@ -5,124 +5,6 @@ {% macro default__information_schema_hints() %}{% endmacro %} {% macro fabric__information_schema_hints() %}{% endmacro %} -{% macro fabric__get_catalog(information_schemas, schemas) -%} - - {%- call statement('catalog', fetch_result=True) -%} - - with - principals as ( - select - name as principal_name, - principal_id as principal_id - from - sys.database_principals {{ information_schema_hints() }} - ), - - schemas as ( - select - name as schema_name, - schema_id as schema_id, - principal_id as principal_id - from - sys.schemas {{ information_schema_hints() }} - ), - - tables as ( - select - name as table_name, - schema_id as schema_id, - principal_id as principal_id, - 'BASE TABLE' as table_type - from - sys.tables {{ information_schema_hints() }} - ), - - tables_with_metadata as ( - select - table_name, - schema_name, - coalesce(tables.principal_id, schemas.principal_id) as owner_principal_id, - table_type - from - tables - join schemas on tables.schema_id = schemas.schema_id - ), - - views as ( - select - name as table_name, - schema_id as schema_id, - principal_id as principal_id, - 'VIEW' as table_type - from - sys.views {{ information_schema_hints() }} - ), - - views_with_metadata as ( - select - table_name, - schema_name, - coalesce(views.principal_id, schemas.principal_id) as owner_principal_id, - table_type - from - views - join schemas on views.schema_id = schemas.schema_id - ), - - tables_and_views as ( - select - table_name, - schema_name, - principal_name, - table_type - from - tables_with_metadata - join principals on tables_with_metadata.owner_principal_id = principals.principal_id - union all - select - table_name, - schema_name, - principal_name, - table_type - from - views_with_metadata - join principals on views_with_metadata.owner_principal_id = principals.principal_id - ), - - cols as ( - - select - table_catalog as table_database, - table_schema, - table_name, - column_name, - ordinal_position as column_index, - data_type as column_type - from INFORMATION_SCHEMA.COLUMNS {{ information_schema_hints() }} - - ) - - select - cols.table_database, - tv.schema_name as table_schema, - tv.table_name, - tv.table_type, - null as table_comment, - tv.principal_name as table_owner, - cols.column_name, - cols.column_index, - cols.column_type, - null as column_comment - from tables_and_views tv - join cols on tv.schema_name = cols.table_schema and tv.table_name = cols.table_name - order by column_index - - {%- endcall -%} - - {{ return(load_result('catalog').table) }} - -{%- endmacro %} - {% macro fabric__information_schema_name(database) -%} information_schema {%- endmacro %} @@ -144,7 +26,8 @@ {{ return(load_result('check_schema_exists').table) }} {% endmacro %} -{% macro fabric__list_relations_without_caching(schema_relation) %} +{% macro fabric__list_relations_without_caching(schema_relation) -%} +{{ log("schema_relation in fabric__list_relations_without_caching is " ~ schema_relation.schema, info=True) }} {% call statement('list_relations_without_caching', fetch_result=True) -%} select table_catalog as [database], @@ -155,8 +38,29 @@ else table_type end as table_type - from [{{ schema_relation.database }}].INFORMATION_SCHEMA.TABLES {{ information_schema_hints() }} + from INFORMATION_SCHEMA.TABLES {{ information_schema_hints() }} where table_schema like '{{ schema_relation.schema }}' {% endcall %} {{ return(load_result('list_relations_without_caching').table) }} {% endmacro %} + +{% macro fabric__get_relation_last_modified(information_schema, relations) -%} +{{ log("information_schema - "~ information_schema, info=True) }} + {%- call statement('last_modified', fetch_result=True) -%} + select + o.name as [identifier] + , s.name as [schema] + , o.modify_date as last_modified + , current_timestamp as snapshotted_at + from sys.objects o + inner join sys.schemas s on o.schema_id = s.schema_id and [type] = 'U' + where ( + {%- for relation in relations -%} + (upper(s.name) = upper('{{ relation.schema }}') and + upper(o.name) = upper('{{ relation.identifier }}')){%- if not loop.last %} or {% endif -%} + {%- endfor -%} + ) + {%- endcall -%} + {{ return(load_result('last_modified')) }} + +{% endmacro %} diff --git a/dbt/include/fabric/macros/adapters/show.sql b/dbt/include/fabric/macros/adapters/show.sql new file mode 100644 index 00000000..3d05a783 --- /dev/null +++ b/dbt/include/fabric/macros/adapters/show.sql @@ -0,0 +1,14 @@ +{% macro fabric__get_limit_subquery_sql(sql, limit) %} + + {% if sql.strip().lower().startswith('with') %} + {{ sql }} order by (select null) + offset 0 rows fetch first {{ limit }} rows only + {% else -%} + select * + from ( + {{ sql }} + ) as model_limit_subq order by (select null) + offset 0 rows fetch first {{ limit }} rows only + {%- endif -%} + +{% endmacro %} diff --git a/dbt/include/fabric/macros/materializations/tests/helpers.sql b/dbt/include/fabric/macros/materializations/tests/helpers.sql index 43fe2d35..4003057e 100644 --- a/dbt/include/fabric/macros/materializations/tests/helpers.sql +++ b/dbt/include/fabric/macros/materializations/tests/helpers.sql @@ -1,4 +1,27 @@ {% macro fabric__get_test_sql(main_sql, fail_calc, warn_if, error_if, limit) -%} + + {% if main_sql.strip().lower().startswith('with') %} + {% set testview %} + dbo.testview_{{ range(1300, 19000) | random }} + {% endset %} + + {% set sql = main_sql.replace("'", "''")%} + + EXEC('create view {{testview}} as {{ sql }};') + select + {{ "top (" ~ limit ~ ')' if limit != none }} + {{ fail_calc }} as failures, + case when {{ fail_calc }} {{ warn_if }} + then 'true' else 'false' end as should_warn, + case when {{ fail_calc }} {{ error_if }} + then 'true' else 'false' end as should_error + from ( + select * from {{testview}} + ) dbt_internal_test; + + EXEC('drop view {{testview}};') + + {% else -%} select {{ "top (" ~ limit ~ ')' if limit != none }} {{ fail_calc }} as failures, @@ -9,4 +32,5 @@ from ( {{ main_sql }} ) dbt_internal_test + {%- endif -%} {%- endmacro %} diff --git a/dbt/include/fabric/macros/materializations/tests/test.sql b/dbt/include/fabric/macros/materializations/tests/test.sql deleted file mode 100644 index 878441aa..00000000 --- a/dbt/include/fabric/macros/materializations/tests/test.sql +++ /dev/null @@ -1,48 +0,0 @@ -{%- materialization test, adapter='fabric' -%} - - {% set relations = [] %} - - {% set identifier = model['alias'] %} - {% set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) %} - {% set target_relation = api.Relation.create( - identifier=identifier, schema=schema, database=database, type='table') -%} %} - - - {% if old_relation %} - {% do adapter.drop_relation(old_relation) %} - {% elif not old_relation %} - {% do adapter.create_schema(target_relation) %} - {% endif %} - - {% call statement(auto_begin=True) %} - {{ create_table_as(False, target_relation, sql) }} - {% endcall %} - - {% set main_sql %} - select * - from {{ target_relation }} - {% endset %} - - {{ adapter.commit() }} - - - {% set limit = config.get('limit') %} - {% set fail_calc = config.get('fail_calc') %} - {% set warn_if = config.get('warn_if') %} - {% set error_if = config.get('error_if') %} - - {% call statement('main', fetch_result=True) -%} - - {{ get_test_sql(main_sql, fail_calc, warn_if, error_if, limit)}} - - {%- endcall %} - - {% if should_store_failures() %} - {% do relations.append(target_relation) %} - {% elif not should_store_failures() %} - {% do adapter.drop_relation(target_relation) %} - {% endif %} - - {{ return({'relations': relations}) }} - -{%- endmaterialization -%} diff --git a/dev_requirements.txt b/dev_requirements.txt index ac908589..3bd8c0e6 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,9 +1,9 @@ pytest==7.4.3 twine==4.0.2 wheel==0.41.3 -pre-commit==3.5.0;python_version>="3.8" +pre-commit==3.5.0 pytest-dotenv==0.5.2 -dbt-tests-adapter~=1.7.1 +dbt-tests-adapter~=1.7.2 flaky==3.7.0 -pytest-xdist==3.3.1 +pytest-xdist==3.5.0 -e . diff --git a/setup.py b/setup.py index 776e9b02..758eeb6a 100644 --- a/setup.py +++ b/setup.py @@ -67,7 +67,7 @@ def run(self): include_package_data=True, install_requires=[ "dbt-core~=1.7.2", - "pyodbc~=4.0.35,!=4.0.36,!=4.0.37", + "pyodbc>=4.0.35,<5.1.0", "azure-identity>=1.12.0", ], cmdclass={ diff --git a/tests/functional/adapter/test_dbt_show.py b/tests/functional/adapter/test_dbt_show.py new file mode 100644 index 00000000..97eeaf8d --- /dev/null +++ b/tests/functional/adapter/test_dbt_show.py @@ -0,0 +1,52 @@ +import pytest +from dbt.tests.adapter.dbt_show.fixtures import ( + models__ephemeral_model, + models__sample_model, + models__second_ephemeral_model, + seeds__sample_seed, +) +from dbt.tests.adapter.dbt_show.test_dbt_show import BaseShowSqlHeader +from dbt.tests.util import run_dbt + + +# -- Below we define base classes for tests you import based on if your adapter supports dbt show or not -- +class BaseShowLimit: + @pytest.fixture(scope="class") + def models(self): + return { + "sample_model.sql": models__sample_model, + "ephemeral_model.sql": models__ephemeral_model, + } + + @pytest.fixture(scope="class") + def seeds(self): + return {"sample_seed.csv": seeds__sample_seed} + + @pytest.mark.parametrize( + "args,expected", + [ + ([], 5), # default limit + (["--limit", 3], 3), # fetch 3 rows + (["--limit", -1], 7), # fetch all rows + ], + ) + def test_limit(self, project, args, expected): + run_dbt(["build"]) + dbt_args = ["show", "--inline", models__second_ephemeral_model, *args] + results = run_dbt(dbt_args) + assert len(results.results[0].agate_table) == expected + # ensure limit was injected in compiled_code when limit specified in command args + limit = results.args.get("limit") + if limit > 0: + assert ( + f"offset 0 rows fetch first { limit } rows only" + in results.results[0].node.compiled_code + ) + + +class TestFabricShowLimit(BaseShowLimit): + pass + + +class TestFabricShowSqlHeader(BaseShowSqlHeader): + pass diff --git a/tests/functional/adapter/test_get_last_relation_modified.py b/tests/functional/adapter/test_get_last_relation_modified.py new file mode 100644 index 00000000..f141a700 --- /dev/null +++ b/tests/functional/adapter/test_get_last_relation_modified.py @@ -0,0 +1,59 @@ +import os + +import pytest +from dbt.cli.main import dbtRunner + +freshness_via_metadata_schema_yml = """version: 2 +sources: + - name: test_source + freshness: + warn_after: {count: 10, period: hour} + error_after: {count: 1, period: day} + schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}" + tables: + - name: test_table +""" + + +class TestGetLastRelationModified: + @pytest.fixture(scope="class", autouse=True) + def set_env_vars(self, project): + os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema + yield + del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] + + @pytest.fixture(scope="class") + def models(self): + return {"schema.yml": freshness_via_metadata_schema_yml} + + @pytest.fixture(scope="class") + def custom_schema(self, project, set_env_vars): + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] + ) + project.adapter.drop_schema(relation) + project.adapter.create_schema(relation) + + yield relation.schema + + with project.adapter.connection_named("__test"): + project.adapter.drop_schema(relation) + + def test_get_last_relation_modified(self, project, set_env_vars, custom_schema): + project.run_sql( + f"create table {custom_schema}.test_table (id int, name varchar(100) not null);" + ) + + warning_or_error = False + + def probe(e): + nonlocal warning_or_error + if e.info.level in ["warning", "error"]: + warning_or_error = True + + runner = dbtRunner(callbacks=[probe]) + runner.invoke(["source", "freshness"]) + + # The 'source freshness' command should succeed without warnings or errors. + assert not warning_or_error diff --git a/tests/functional/adapter/test_list_relations_without_caching.py b/tests/functional/adapter/test_list_relations_without_caching.py new file mode 100644 index 00000000..0fd62603 --- /dev/null +++ b/tests/functional/adapter/test_list_relations_without_caching.py @@ -0,0 +1,167 @@ +import json + +import pytest +from dbt.tests.util import run_dbt, run_dbt_and_capture + +NUM_VIEWS = 10 +NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS + +TABLE_BASE_SQL = """ +{{ config(materialized='table') }} + +select 1 as id +""".lstrip() + +VIEW_X_SQL = """ +select id from {{ ref('my_model_base') }} +""".lstrip() + +# TODO - fix the call {% set relation_list_result = fabric__list_relations_without_caching(schema_relation) %} +MACROS__VALIDATE__FABRIC__LIST_RELATIONS_WITHOUT_CACHING = """ +{% macro validate_list_relations_without_caching(schema_relation) -%} + + {% call statement('list_relations_without_caching', fetch_result=True) -%} + select + table_catalog as [database], + table_name as [name], + table_schema as [schema], + case when table_type = 'BASE TABLE' then 'table' + when table_type = 'VIEW' then 'view' + else table_type + end as table_type + + from INFORMATION_SCHEMA.TABLES + where table_schema like '{{ schema_relation }}' + {% endcall %} + + {% set relation_list_result = load_result('list_relations_without_caching').table %} + {% set n_relations = relation_list_result | length %} + {{ log("n_relations: " ~ n_relations) }} +{% endmacro %} +""" + + +def parse_json_logs(json_log_output): + parsed_logs = [] + for line in json_log_output.split("\n"): + try: + log = json.loads(line) + except ValueError: + continue + + parsed_logs.append(log) + + return parsed_logs + + +def find_result_in_parsed_logs(parsed_logs, result_name): + return next( + ( + item["data"]["msg"] + for item in parsed_logs + if result_name in item["data"].get("msg", "msg") + ), + False, + ) + + +def find_exc_info_in_parsed_logs(parsed_logs, exc_info_name): + return next( + ( + item["data"]["exc_info"] + for item in parsed_logs + if exc_info_name in item["data"].get("exc_info", "exc_info") + ), + False, + ) + + +class TestListRelationsWithoutCachingSingle: + @pytest.fixture(scope="class") + def models(self): + my_models = {"my_model_base.sql": TABLE_BASE_SQL} + for view in range(0, NUM_VIEWS): + my_models.update({f"my_model_{view}.sql": VIEW_X_SQL}) + + return my_models + + @pytest.fixture(scope="class") + def macros(self): + return { + "validate_list_relations_without_caching.sql": MACROS__VALIDATE__FABRIC__LIST_RELATIONS_WITHOUT_CACHING, + } + + def test__fabric__list_relations_without_caching(self, project): + """ + validates that fabric__list_relations_without_caching + macro returns a single record + """ + run_dbt(["run", "-s", "my_model_base"]) + + # database = project.database + schemas = project.created_schemas + + for schema in schemas: + # schema_relation = BaseRelation.create(schema=schema, database=database) + # schema_relation = f"{database}.{schema}" + kwargs = {"schema_relation": schema} + _, log_output = run_dbt_and_capture( + [ + "--debug", + # "--log-format=json", + "run-operation", + "validate_list_relations_without_caching", + "--args", + str(kwargs), + ] + ) + + # parsed_logs = parse_json_logs(log_output) + # print(parsed_logs) + # n_relations = find_result_in_parsed_logs(parsed_logs, "n_relations") + + # assert n_relations == "n_relations: 1" + assert "n_relations: 1" in log_output + + +class TestListRelationsWithoutCachingFull: + @pytest.fixture(scope="class") + def models(self): + my_models = {"my_model_base.sql": TABLE_BASE_SQL} + for view in range(0, NUM_VIEWS): + my_models.update({f"my_model_{view}.sql": VIEW_X_SQL}) + + return my_models + + @pytest.fixture(scope="class") + def macros(self): + return { + "validate_list_relations_without_caching.sql": MACROS__VALIDATE__FABRIC__LIST_RELATIONS_WITHOUT_CACHING, + } + + def test__fabric__list_relations_without_caching(self, project): + # purpose of the first run is to create the replicated views in the target schema + run_dbt(["run"]) + + # database = project.database + schemas = project.created_schemas + + for schema in schemas: + # schema_relation = f"{database}.{schema}" + kwargs = {"schema_relation": schema} + _, log_output = run_dbt_and_capture( + [ + "--debug", + # "--log-format=json", + "run-operation", + "validate_list_relations_without_caching", + "--args", + str(kwargs), + ] + ) + + # parsed_logs = parse_json_logs(log_output) + # n_relations = find_result_in_parsed_logs(parsed_logs, "n_relations") + + # assert n_relations == f"n_relations: {NUM_EXPECTED_RELATIONS}" + assert f"n_relations: {NUM_EXPECTED_RELATIONS}" in log_output diff --git a/tests/functional/adapter/test_query_comment.py b/tests/functional/adapter/test_query_comment.py index 19605f04..43223be3 100644 --- a/tests/functional/adapter/test_query_comment.py +++ b/tests/functional/adapter/test_query_comment.py @@ -1,136 +1,11 @@ -import json - -import pytest -from dbt.exceptions import DbtRuntimeError -from dbt.tests.util import run_dbt_and_capture -from dbt.version import __version__ as dbt_version - -MACROS__MACRO_SQL = """ -{%- macro query_header_no_args() -%} -{%- set x = "are pretty cool" -%} -{{ "dbt macros" }} -{{ x }} -{%- endmacro -%} - - -{%- macro query_header_args(message) -%} - {%- set comment_dict = dict( - app='dbt++', - macro_version='0.1.0', - dbt_version=dbt_version, - message='blah: '~ message) -%} - {{ return(comment_dict) }} -{%- endmacro -%} - - -{%- macro ordered_to_json(dct) -%} -{{ tojson(dct, sort_keys=True) }} -{%- endmacro %} - - -{% macro invalid_query_header() -%} -{{ "Here is an invalid character for you: */" }} -{% endmacro %} - -""" - -MODELS__X_SQL = """ -{% do run_query('select 2 as inner_id') %} -select 1 as outer_id -""" - - -class BaseDefaultQueryComments: - @pytest.fixture(scope="class") - def models(self): - return { - "x.sql": MODELS__X_SQL, - } - - @pytest.fixture(scope="class") - def macros(self): - return { - "macro.sql": MACROS__MACRO_SQL, - } - - def run_get_json(self, expect_pass=True): - res, raw_logs = run_dbt_and_capture( - ["--debug", "--log-format=json", "run"], expect_pass=expect_pass - ) - - # empty lists evaluate as False - assert len(res) > 0 - return raw_logs - - -# Base setup to be inherited # -class BaseQueryComments(BaseDefaultQueryComments): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"query-comment": "pradeep"} - - def test_matches_comment(self, project) -> bool: - logs = self.run_get_json() - assert r"pradeep" in logs - - -class BaseMacroQueryComments(BaseDefaultQueryComments): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"query-comment": "{{ query_header_no_args() }}"} - - def test_matches_comment(self, project) -> bool: - logs = self.run_get_json() - assert r"/* dbt macros\nare pretty cool */\n" in logs - - -class BaseMacroArgsQueryComments(BaseDefaultQueryComments): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"query-comment": "{{ return(ordered_to_json(query_header_args(target.name))) }}"} - - def test_matches_comment(self, project) -> bool: - logs = self.run_get_json() - expected_dct = { - "app": "dbt++", - "dbt_version": dbt_version, - "macro_version": "0.1.0", - "message": f"blah: {project.adapter.config.target_name}", - } - expected = r"/* {} */\n".format(json.dumps(expected_dct, sort_keys=True)).replace( - '"', r"\"" - ) - assert expected in logs - - -class BaseMacroInvalidQueryComments(BaseDefaultQueryComments): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"query-comment": "{{ invalid_query_header() }}"} - - def test_run_assert_comments(self, project): - with pytest.raises(DbtRuntimeError): - self.run_get_json(expect_pass=False) - - -class BaseNullQueryComments(BaseDefaultQueryComments): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"query-comment": None} - - def test_matches_comment(self, project) -> bool: - logs = self.run_get_json() - assert "/*" not in logs or "*/" not in logs - - -class BaseEmptyQueryComments(BaseDefaultQueryComments): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"query-comment": ""} - - def test_matches_comment(self, project) -> bool: - logs = self.run_get_json() - assert "/*" not in logs or "*/" not in logs +from dbt.tests.adapter.query_comment.test_query_comment import ( + BaseEmptyQueryComments, + BaseMacroArgsQueryComments, + BaseMacroInvalidQueryComments, + BaseMacroQueryComments, + BaseNullQueryComments, + BaseQueryComments, +) # Tests # diff --git a/tests/functional/adapter/test_relation_types.py b/tests/functional/adapter/test_relation_types.py new file mode 100644 index 00000000..95c655e2 --- /dev/null +++ b/tests/functional/adapter/test_relation_types.py @@ -0,0 +1,60 @@ +import pytest +from dbt.contracts.results import CatalogArtifact +from dbt.tests.util import run_dbt + +MY_SEED = """ +id,value +1,100 +2,200 +3,300 +""".strip() + + +MY_TABLE = """ +{{ config( + materialized='table', +) }} +select * from {{ ref('my_seed') }} +""" + + +MY_VIEW = """ +{{ config( + materialized='view', +) }} +select * from {{ ref('my_seed') }} +""" + + +class TestCatalogRelationTypes: + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + return {"my_seed.csv": MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "my_table.sql": MY_TABLE, + "my_view.sql": MY_VIEW, + } + + @pytest.fixture(scope="class", autouse=True) + def docs(self, project): + run_dbt(["seed"]) + run_dbt(["run"]) + yield run_dbt(["docs", "generate"]) + + @pytest.mark.parametrize( + "node_name,relation_type", + [ + ("seed.test.my_seed", "BASE TABLE"), + ("model.test.my_table", "BASE TABLE"), + ("model.test.my_view", "VIEW"), + ], + ) + def test_relation_types_populate_correctly( + self, docs: CatalogArtifact, node_name: str, relation_type: str + ): + assert node_name in docs.nodes + node = docs.nodes[node_name] + assert node.metadata.type == relation_type diff --git a/tests/functional/adapter/test_store_test_failures.py b/tests/functional/adapter/test_store_test_failures.py new file mode 100644 index 00000000..1439ee5c --- /dev/null +++ b/tests/functional/adapter/test_store_test_failures.py @@ -0,0 +1,200 @@ +import pytest +from dbt.tests.adapter.store_test_failures_tests import basic +from dbt.tests.adapter.store_test_failures_tests.fixtures import ( + models__file_model_but_with_a_no_good_very_long_name, + models__fine_model, + models__problematic_model, + properties__schema_yml, + seeds__expected_accepted_values, + seeds__expected_failing_test, + seeds__expected_not_null_problematic_model_id, + seeds__expected_unique_problematic_model_id, + seeds__people, + tests__failing_test, +) +from dbt.tests.util import check_relations_equal, run_dbt + +# from dbt.tests.adapter.store_test_failures_tests.test_store_test_failures import ( +# TestStoreTestFailures, +# ) + + +tests__passing_test = """ +select * from {{ ref('fine_model') }} +where 1=2 +""" + +# used to rename test audit schema to help test schema meet max char limit +# the default is _dbt_test__audit but this runs over the postgres 63 schema name char limit +# without which idempotency conditions will not hold (i.e. dbt can't drop the schema properly) +TEST_AUDIT_SCHEMA_SUFFIX = "dbt_test__aud" + + +class StoreTestFailuresBase: + @pytest.fixture(scope="function", autouse=True) + def setUp(self, project): + self.test_audit_schema = f"{project.test_schema}_{TEST_AUDIT_SCHEMA_SUFFIX}" + run_dbt(["seed"]) + run_dbt(["run"]) + + @pytest.fixture(scope="class") + def seeds(self): + return { + "people.csv": seeds__people, + "expected_accepted_values.csv": seeds__expected_accepted_values, + "expected_failing_test.csv": seeds__expected_failing_test, + "expected_not_null_problematic_model_id.csv": seeds__expected_not_null_problematic_model_id, + "expected_unique_problematic_model_id.csv": seeds__expected_unique_problematic_model_id, + } + + @pytest.fixture(scope="class") + def tests(self): + return { + "failing_test.sql": tests__failing_test, + "passing_test.sql": tests__passing_test, + } + + @pytest.fixture(scope="class") + def properties(self): + return {"schema.yml": properties__schema_yml} + + @pytest.fixture(scope="class") + def models(self): + return { + "fine_model.sql": models__fine_model, + "fine_model_but_with_a_no_good_very_long_name.sql": models__file_model_but_with_a_no_good_very_long_name, + "problematic_model.sql": models__problematic_model, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "seeds": { + "quote_columns": False, + "test": self.column_type_overrides(), + }, + "tests": {"+schema": TEST_AUDIT_SCHEMA_SUFFIX}, + } + + def column_type_overrides(self): + return {} + + def run_tests_store_one_failure(self, project): + run_dbt(["test"], expect_pass=False) + + # one test is configured with store_failures: true, make sure it worked + check_relations_equal( + project.adapter, + [ + f"{self.test_audit_schema}.unique_problematic_model_id", + "expected_unique_problematic_model_id", + ], + ) + + def run_tests_store_failures_and_assert(self, project): + # make sure this works idempotently for all tests + run_dbt(["test", "--store-failures"], expect_pass=False) + results = run_dbt(["test", "--store-failures"], expect_pass=False) + + # compare test results + actual = [(r.status, r.failures) for r in results] + expected = [ + ("pass", 0), + ("pass", 0), + ("pass", 0), + ("pass", 0), + ("fail", 2), + ("fail", 2), + ("fail", 2), + ("fail", 10), + ] + assert sorted(actual) == sorted(expected) + + # compare test results stored in database + check_relations_equal( + project.adapter, [f"{self.test_audit_schema}.failing_test", "expected_failing_test"] + ) + check_relations_equal( + project.adapter, + [ + f"{self.test_audit_schema}.not_null_problematic_model_id", + "expected_not_null_problematic_model_id", + ], + ) + check_relations_equal( + project.adapter, + [ + f"{self.test_audit_schema}.unique_problematic_model_id", + "expected_unique_problematic_model_id", + ], + ) + check_relations_equal( + project.adapter, + [ + f"{self.test_audit_schema}.accepted_values_problemat" + "ic_mo_c533ab4ca65c1a9dbf14f79ded49b628", + "expected_accepted_values", + ], + ) + + +class TestStoreTestFailures(StoreTestFailuresBase): + @pytest.fixture(scope="function") + def clean_up(self, project): + yield + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=self.test_audit_schema + ) + project.adapter.drop_schema(relation) + + relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.adapter.drop_schema(relation) + + def column_type_overrides(self): + return { + "expected_unique_problematic_model_id": { + "+column_types": { + "n_records": "bigint", + }, + }, + "expected_accepted_values": { + "+column_types": { + "n_records": "bigint", + }, + }, + } + + def test__store_and_assert(self, project, clean_up): + self.run_tests_store_one_failure(project) + self.run_tests_store_failures_and_assert(project) + + +class TestFabricStoreTestFailures(TestStoreTestFailures): + pass + + +class TestStoreTestFailuresAsInteractions(basic.StoreTestFailuresAsInteractions): + pass + + +class TestStoreTestFailuresAsProjectLevelOff(basic.StoreTestFailuresAsProjectLevelOff): + pass + + +class TestStoreTestFailuresAsProjectLevelView(basic.StoreTestFailuresAsProjectLevelView): + pass + + +class TestStoreTestFailuresAsGeneric(basic.StoreTestFailuresAsGeneric): + pass + + +class TestStoreTestFailuresAsProjectLevelEphemeral(basic.StoreTestFailuresAsProjectLevelEphemeral): + pass + + +class TestStoreTestFailuresAsExceptions(basic.StoreTestFailuresAsExceptions): + pass From b5afb8d58cf1578bef8a744b06ddb04605b633b2 Mon Sep 17 00:00:00 2001 From: Pradeep Srikakolapu Date: Tue, 28 Nov 2023 08:12:09 -0800 Subject: [PATCH 4/4] updated service principal authentication mode alias and bumped wheel package --- dbt/adapters/fabric/fabric_credentials.py | 3 +++ dev_requirements.txt | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/fabric/fabric_credentials.py b/dbt/adapters/fabric/fabric_credentials.py index 60da8b4f..860b0929 100644 --- a/dbt/adapters/fabric/fabric_credentials.py +++ b/dbt/adapters/fabric/fabric_credentials.py @@ -48,6 +48,9 @@ def _connection_keys(self): if self.windows_login is True: self.authentication = "Windows Login" + if self.authentication.lower().strip() == "serviceprincipal": + self.authentication = "ActiveDirectoryServicePrincipal" + return ( "server", "database", diff --git a/dev_requirements.txt b/dev_requirements.txt index 3bd8c0e6..22ee0532 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,6 +1,6 @@ pytest==7.4.3 twine==4.0.2 -wheel==0.41.3 +wheel==0.42 pre-commit==3.5.0 pytest-dotenv==0.5.2 dbt-tests-adapter~=1.7.2