Skip to content

Commit

Permalink
allow materialized simple copy test
Browse files Browse the repository at this point in the history
  • Loading branch information
nszoni committed Feb 20, 2024
1 parent f58cdfb commit 1746c91
Showing 1 changed file with 78 additions and 9 deletions.
87 changes: 78 additions & 9 deletions tests/functional/adapter/test_simple_copy.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,81 @@
from pathlib import Path

import pytest
from dbt.adapters.factory import get_adapter_by_type
from dbt.tests.adapter.simple_copy.fixtures import _SEEDS__SEED_UPDATE
from dbt.tests.adapter.simple_copy.test_simple_copy import SimpleCopySetup
from dbt.tests.fixtures.project import TestProjInfo
from dbt.tests.util import check_relations_equal, rm_file, run_dbt, write_file
from dbt.tests.util import (
check_relations_equal,
get_connection,
rm_file,
run_dbt,
run_sql_with_adapter,
write_file,
)


class TestProjInfoSynapse:
__test__ = False

def __init__(
self,
project_root,
profiles_dir,
adapter_type,
test_dir,
shared_data_dir,
test_data_dir,
test_schema,
database,
test_config,
):
self.project_root = project_root
self.profiles_dir = profiles_dir
self.adapter_type = adapter_type
self.test_dir = test_dir
self.shared_data_dir = shared_data_dir
self.test_data_dir = test_data_dir
self.test_schema = test_schema
self.database = database
self.test_config = test_config
self.created_schemas = []

@property
def adapter(self):
# This returns the last created "adapter" from the adapter factory. Each
# dbt command will create a new one. This allows us to avoid patching the
# providers 'get_adapter' function.
return get_adapter_by_type(self.adapter_type)

# Run sql from a path
def run_sql_file(self, sql_path, fetch=None):
with open(sql_path, "r") as f:
statements = f.read().split(";")
for statement in statements:
self.run_sql(statement, fetch)

# Run sql from a string, using adapter saved at test startup
def run_sql(self, sql, fetch=None):
return run_sql_with_adapter(self.adapter, sql, fetch=fetch)

# Create the unique test schema. Used in test setup, so that we're
# ready for initial sql prior to a run_dbt command.
def create_test_schema(self, schema_name=None):
if schema_name is None:
schema_name = self.test_schema
with get_connection(self.adapter):
relation = self.adapter.Relation.create(database=self.database, schema=schema_name)
self.adapter.create_schema(relation)
self.created_schemas.append(schema_name)

# Drop the unique test schema, usually called in test cleanup
def drop_test_schema(self):
with get_connection(self.adapter):
for schema_name in self.created_schemas:
relation = self.adapter.Relation.create(database=self.database, schema=schema_name)
self.adapter.drop_schema(relation)
self.created_schemas = []


class SynapseTestProjInfo(TestProjInfo):
# This return a dictionary of table names to 'view' or 'table' values.
# Override class because Synapse doesnt have 'ILIKE'
def synapse_get_tables_in_schema(self):
Expand All @@ -30,7 +98,7 @@ def synapse_get_tables_in_schema(self):
@pytest.fixture
def synapse_project(project):
# Replace the original class with the new one
project.__class__ = SynapseTestProjInfo
project.__class__ = TestProjInfoSynapse

return project

Expand Down Expand Up @@ -63,20 +131,21 @@ def test_simple_copy(self, synapse_project):
["seed", "view_model", "incremental", "materialized", "get_and_ref"],
)

@pytest.mark.skip(reason="We are not supporting materialized views yet")
# in Synapse materialized views must be created with aggregation and distribution option
def test_simple_copy_with_materialized_views(self, synapse_project):
synapse_project.run_sql(
f"create table {synapse_project.test_schema}.unrelated_table (id int)"
)
sql = f"""
create materialized view {synapse_project.test_schema}.unrelated_materialized_view as (
select * from {synapse_project.test_schema}.unrelated_table
create materialized view {synapse_project.test_schema}.unrelated_materialized_view
with ( distribution = round_robin ) as (
select id from {synapse_project.test_schema}.unrelated_table group by id
)
"""
synapse_project.run_sql(sql)
sql = f"""
create view {synapse_project.test_schema}.unrelated_view as (
select * from {synapse_project.test_schema}.unrelated_materialized_view
select id from {synapse_project.test_schema}.unrelated_materialized_view
)
"""
synapse_project.run_sql(sql)
Expand Down

0 comments on commit 1746c91

Please sign in to comment.