Skip to content

Commit

Permalink
Updating dbt-fabric adapter to stay compatible with dbt-synapse adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
prdpsvs committed May 22, 2024
1 parent eb7e3f8 commit 2d34916
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 146 deletions.
16 changes: 14 additions & 2 deletions dbt/include/fabric/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@
information_schema
{%- endmacro %}

{% macro get_use_database_sql(database) %}
{{ return(adapter.dispatch('get_use_database_sql', 'dbt')(database)) }}
{% endmacro %}

{%- macro default__get_use_database_sql(database) -%}
{%- endmacro -%}


{%- macro fabric__get_use_database_sql(database) -%}
USE [{{database}}];
{%- endmacro -%}

{% macro fabric__list_schemas(database) %}
{% call statement('list_schemas', fetch_result=True, auto_begin=False) -%}
select name as [schema]
Expand All @@ -27,7 +39,7 @@

{% macro fabric__list_relations_without_caching(schema_relation) -%}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
USE [{{ schema_relation.database }}];
{{ get_use_database_sql(schema_relation.database) }}
with base as (
select
DB_NAME() as [database],
Expand All @@ -51,7 +63,7 @@

{% macro fabric__get_relation_without_caching(schema_relation) -%}
{% call statement('get_relation_without_caching', fetch_result=True) -%}
USE [{{ schema_relation.database }}];
{{ get_use_database_sql(schema_relation.database) }}
with base as (
select
DB_NAME() as [database],
Expand Down
6 changes: 3 additions & 3 deletions dbt/include/fabric/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

{% if relation.type == 'view' -%}
{% call statement('find_references', fetch_result=true) %}
USE [{{ relation.database }}];
{{ get_use_database_sql(relation.database) }}
select
sch.name as schema_name,
obj.name as view_name
Expand Down Expand Up @@ -44,13 +44,13 @@
{%- else -%}
{{ exceptions.raise_not_implemented('Invalid relation being dropped: ' ~ relation) }}
{% endif %}
USE [{{ relation.database }}];
{{ get_use_database_sql(relation.database) }}
EXEC('DROP {{ relation.type }} IF EXISTS {{ relation.include(database=False) }};');
{% endmacro %}

{% macro fabric__rename_relation(from_relation, to_relation) -%}
{% call statement('rename_relation') -%}
USE [{{ from_relation.database }}];
{{ get_use_database_sql(from_relation.database) }}
EXEC sp_rename '{{ from_relation.schema }}.{{ from_relation.identifier }}', '{{ to_relation.identifier }}'
{%- endcall %}
{% endmacro %}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/fabric/macros/adapters/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

{% macro fabric__create_schema_with_authorization(relation, schema_authorization) -%}
{% call statement('create_schema') -%}
USE [{{ relation.database }}];
{{ get_use_database_sql(relation.database) }}
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{{ relation.schema }}')
BEGIN
EXEC('CREATE SCHEMA [{{ relation.schema }}] AUTHORIZATION [{{ schema_authorization }}]')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@

{%- set full_refresh_mode = (should_full_refresh()) -%}
{% set target_relation = this.incorporate(type='table') %}
{%- set relations_list = fabric__get_relation_without_caching(target_relation) -%}
{%- set existing_relation = adapter.get_relation(database=this.database, schema=this.schema, identifier=this.identifier) -%}

{# {%- set relations_list = fabric__get_relation_without_caching(target_relation) -%}
{%- set existing_relation = none %}
{% if (relations_list|length == 1) and (relations_list[0][2] == target_relation.schema)
and (relations_list[0][1] == target_relation.identifier) and (relations_list[0][3] == target_relation.type)%}
{% set existing_relation = target_relation %}
{% elif (relations_list|length == 1) and (relations_list[0][2] == target_relation.schema)
and (relations_list[0][1] == target_relation.identifier) and (relations_list[0][3] != target_relation.type) %}
{% set existing_relation = get_or_create_relation(relations_list[0][0], relations_list[0][2] , relations_list[0][1] , relations_list[0][3])[1] %}
{% endif %}
{% endif %} #}

{# {{ log("Full refresh mode" ~ full_refresh_mode)}}
{{ log("existing relation : "~existing_relation ~ " type "~ existing_relation.type ~ " is view? "~existing_relation.is_view) }}
Expand All @@ -38,7 +39,7 @@

{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{{ drop_relation_if_exists(existing_relation) }}
{{ drop_relation(existing_relation) }}
{%- call statement('main') -%}
{{ get_create_table_as_sql(False, target_relation, sql)}}
{%- endcall -%}
Expand Down Expand Up @@ -72,7 +73,7 @@
{%- endcall -%}
{% endif %}

{% do drop_relation_if_exists(temp_relation) %}
{% do drop_relation(temp_relation) %}
{{ run_hooks(post_hooks, inside_transaction=True) }}

{% set target_relation = target_relation.incorporate(type='table') %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
{%- set target_relation = this.incorporate(type='table') -%}

{% call statement('main') %}
{{ drop_relation_if_exists(target_relation) }}
{{ drop_relation(target_relation) }}
{{ create_or_replace_clone(target_relation, defer_relation) }}
{% endcall %}
{{ return({'relations': [target_relation]}) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{% set tmp_relation = relation.incorporate(
path={"identifier": relation.identifier.replace("#", "") ~ '_temp_view'},
type='view')-%}
{% do run_query(drop_relation_if_exists(tmp_relation)) %}
{% do run_query(drop_relation(tmp_relation)) %}

{% set contract_config = config.get('contract') %}

Expand All @@ -27,6 +27,6 @@
EXEC('CREATE TABLE [{{relation.database}}].[{{relation.schema}}].[{{relation.identifier}}] AS (SELECT * FROM [{{tmp_relation.database}}].[{{tmp_relation.schema}}].[{{tmp_relation.identifier}}]);');
{% endif %}

{{ drop_relation_if_exists(tmp_relation) }}
{{ drop_relation(tmp_relation) }}

{% endmacro %}
13 changes: 3 additions & 10 deletions dbt/include/fabric/macros/materializations/models/table/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,9 @@

-- Load target relation
{%- set target_relation = this.incorporate(type='table') %}
-- Load existing relation
{%- set relation = fabric__get_relation_without_caching(this) %}

{% set existing_relation = none %}
{% if (relation|length == 1) %}
{% set existing_relation = get_or_create_relation(relation[0][0], relation[0][2] , relation[0][1] , relation[0][3])[1] %}
{% endif %}
{%- set existing_relation = adapter.get_relation(database=this.database, schema=this.schema, identifier=this.identifier) -%}

{%- set backup_relation = none %}
{{log("Existing Relation type is "~ existing_relation.type)}}
{% if (existing_relation != none and existing_relation.type == "table") %}
{%- set backup_relation = make_backup_relation(target_relation, 'table') -%}
{% elif (existing_relation != none and existing_relation.type == "view") %}
Expand All @@ -20,7 +13,7 @@

{% if (existing_relation != none) %}
-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(backup_relation) }}
{{ drop_relation(backup_relation) }}
-- Rename target relation as backup relation
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{% endif %}
Expand Down Expand Up @@ -48,7 +41,7 @@
-- finally, drop the foreign key references if exists
{{ drop_fk_indexes_on_table(backup_relation) }}
-- drop existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}
{{ drop_relation(backup_relation) }}
{% endif %}
-- Add constraints including FK relation.
{{ fabric__build_model_constraints(target_relation) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

{%- set temp_view_sql = sql.replace("'", "''") -%}

USE [{{ relation.database }}];
{{ get_use_database_sql(relation.database) }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
Expand Down
14 changes: 3 additions & 11 deletions dbt/include/fabric/macros/materializations/models/view/view.sql
Original file line number Diff line number Diff line change
@@ -1,17 +1,9 @@
{% materialization view, adapter='fabric' -%}

{%- set target_relation = this.incorporate(type='view') -%}
{{log("Target Relation "~target_relation)}}

{%- set relation = fabric__get_relation_without_caching(this) %}
{% set existing_relation = none %}
{% if (relation|length == 1) %}
{% set existing_relation = get_or_create_relation(relation[0][0], relation[0][2] , relation[0][1] , relation[0][3])[1] %}
{% endif %}
{# {{log("Existing Relation "~existing_relation)}} #}
{%- set existing_relation = adapter.get_relation(database=this.database, schema=this.schema, identifier=this.identifier) -%}

{%- set backup_relation = none %}
{# {{log("Existing Relation type is "~ existing_relation.type)}} #}
{% if (existing_relation != none and existing_relation.type == "table") %}
{%- set backup_relation = make_backup_relation(target_relation, 'table') -%}
{% elif (existing_relation != none and existing_relation.type == "view") %}
Expand All @@ -20,7 +12,7 @@

{% if (existing_relation != none) %}
-- drop the temp relations if they exist already in the database
{{ drop_relation_if_exists(backup_relation) }}
{{ drop_relation(backup_relation) }}
-- Rename target relation as backup relation
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{% endif %}
Expand All @@ -43,7 +35,7 @@
{{ run_hooks(post_hooks, inside_transaction=True) }}
{{ adapter.commit() }}
{% if (backup_relation != none) %}
{{ drop_relation_if_exists(backup_relation) }}
{{ drop_relation(backup_relation) }}
{% endif %}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}
Expand Down
115 changes: 10 additions & 105 deletions tests/functional/adapter/test_caching.py
Original file line number Diff line number Diff line change
@@ -1,117 +1,22 @@
import pytest
from dbt.tests.util import run_dbt

model_sql = """
{{
config(
materialized='table'
)
}}
select 1 as id
"""

another_schema_model_sql = """
{{
config(
materialized='table',
schema='another_schema'
)
}}
select 1 as id
"""


class BaseCachingTest:
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"config-version": 2,
"quoting": {
"identifier": False,
"schema": False,
},
}

def run_and_inspect_cache(self, project, run_args=None):
run_dbt(run_args)

# the cache was empty at the start of the run.
# the model materialization returned an unquoted relation and added to the cache.
adapter = project.adapter
assert len(adapter.cache.relations) == 1
relation = list(adapter.cache.relations).pop()
assert relation.schema == project.test_schema
assert relation.schema == project.test_schema.lower()

# on the second run, dbt will find a relation in the database during cache population.
# this relation will be quoted, because list_relations_without_caching (by default) uses
# quote_policy = {"database": True, "schema": True, "identifier": True}
# when adding relations to the cache.
run_dbt(run_args)
adapter = project.adapter
assert len(adapter.cache.relations) == 1
second_relation = list(adapter.cache.relations).pop()

# perform a case-insensitive + quote-insensitive comparison
for key in ["database", "schema", "identifier"]:
assert getattr(relation, key).lower() == getattr(second_relation, key).lower()

def test_cache(self, project):
self.run_and_inspect_cache(project, run_args=["run"])


class BaseCachingLowercaseModel(BaseCachingTest):
@pytest.fixture(scope="class")
def models(self):
return {
"model.sql": model_sql,
}


class BaseCachingUppercaseModel(BaseCachingTest):
@pytest.fixture(scope="class")
def models(self):
return {
"MODEL.sql": model_sql,
}


class BaseCachingSelectedSchemaOnly(BaseCachingTest):
@pytest.fixture(scope="class")
def models(self):
return {
"model.sql": model_sql,
"another_schema_model.sql": another_schema_model_sql,
}

def test_cache(self, project):
# this should only cache the schema containing the selected model
run_args = ["--cache-selected-only", "run", "--select", "model"]
self.run_and_inspect_cache(project, run_args)


class TestNoPopulateCache(BaseCachingTest):
@pytest.fixture(scope="class")
def models(self):
return {
"model.sql": model_sql,
}

def test_cache(self, project):
# --no-populate-cache still allows the cache to populate all relations
# under a schema, so the behavior here remains the same as other tests
run_args = ["--no-populate-cache", "run"]
self.run_and_inspect_cache(project, run_args)
from dbt.tests.adapter.caching.test_caching import (
BaseCachingLowercaseModel,
BaseCachingSelectedSchemaOnly,
BaseCachingUppercaseModel,
BaseNoPopulateCache,
)


class TestCachingLowerCaseModel(BaseCachingLowercaseModel):
pass


@pytest.mark.skip(reason="Fabric DW does not support Case Insensivity.")
class TestCachingUppercaseModel(BaseCachingUppercaseModel):
pass


class TestCachingSelectedSchemaOnly(BaseCachingSelectedSchemaOnly):
pass


class TestNoPopulateCache(BaseNoPopulateCache):
pass
18 changes: 14 additions & 4 deletions tests/functional/adapter/test_constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,11 @@ def models(self):
@pytest.fixture(scope="class")
def expected_sql(self):
return """
EXEC('create view <model_identifier> as -- depends_on: <foreign_key_model_identifier> select ''blue'' as color, 1 as id, ''2019-01-01'' as date_day;'); CREATE TABLE <model_identifier> ( id int not null, color varchar(100), date_day varchar(100) ) INSERT INTO <model_identifier> ( [id], [color], [date_day] ) SELECT [id], [color], [date_day] FROM <model_identifier> EXEC('DROP view IF EXISTS <model_identifier>
EXEC('create view <model_identifier> as -- depends_on: <foreign_key_model_identifier> select ''blue'' as color, 1 as id, ''2019-01-01'' as date_day;'); CREATE TABLE <model_identifier> ( id int not null, color varchar(100), date_day varchar(100) ) INSERT INTO <model_identifier> ( [id], [color], [date_day] ) SELECT [id], [color], [date_day] FROM <model_identifier>
"""

# EXEC('DROP view IF EXISTS <model_identifier>

def test__constraints_ddl(self, project, expected_sql):
unformatted_constraint_schema_yml = read_file("models", "constraints_schema.yml")
write_file(
Expand All @@ -501,7 +503,9 @@ def test__constraints_ddl(self, project, expected_sql):
generated_sql_generic, "foreign_key_model", "<foreign_key_model_identifier>"
)
generated_sql_wodb = generated_sql_generic.replace("USE [" + project.database + "];", "")
assert _normalize_whitespace(expected_sql) == _normalize_whitespace(generated_sql_wodb)
assert _normalize_whitespace(expected_sql.strip()) == _normalize_whitespace(
generated_sql_wodb.strip()
)


class TestTableConstraintsRuntimeDdlEnforcement(BaseConstraintsRuntimeDdlEnforcement):
Expand Down Expand Up @@ -542,9 +546,11 @@ def models(self):
@pytest.fixture(scope="class")
def expected_sql(self):
return """
EXEC('create view <model_identifier> as -- depends_on: <foreign_key_model_identifier> select ''blue'' as color, 1 as id, ''2019-01-01'' as date_day;'); CREATE TABLE <model_identifier> ( id int not null, color varchar(100), date_day varchar(100) ) INSERT INTO <model_identifier> ( [id], [color], [date_day] ) SELECT [id], [color], [date_day] FROM <model_identifier> EXEC('DROP view IF EXISTS <model_identifier>
EXEC('create view <model_identifier> as -- depends_on: <foreign_key_model_identifier> select ''blue'' as color, 1 as id, ''2019-01-01'' as date_day;'); CREATE TABLE <model_identifier> ( id int not null, color varchar(100), date_day varchar(100) ) INSERT INTO <model_identifier> ( [id], [color], [date_day] ) SELECT [id], [color], [date_day] FROM <model_identifier>
"""

# EXEC('DROP view IF EXISTS <model_identifier>

def test__model_constraints_ddl(self, project, expected_sql):
unformatted_constraint_schema_yml = read_file("models", "constraints_schema.yml")
write_file(
Expand All @@ -562,7 +568,11 @@ def test__model_constraints_ddl(self, project, expected_sql):
generated_sql_generic, "foreign_key_model", "<foreign_key_model_identifier>"
)
generated_sql_wodb = generated_sql_generic.replace("USE [" + project.database + "];", "")
assert _normalize_whitespace(expected_sql) == _normalize_whitespace(generated_sql_wodb)
print("Expected:", expected_sql.strip())
print("Generated:", generated_sql_wodb.strip())
assert _normalize_whitespace(expected_sql.strip()) == _normalize_whitespace(
generated_sql_wodb.strip()
)


class TestModelConstraintsRuntimeEnforcement(BaseModelConstraintsRuntimeEnforcement):
Expand Down
Loading

0 comments on commit 2d34916

Please sign in to comment.