From 5b486835cf929e2bddfbed28caa741144bf72197 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Tue, 22 Mar 2022 15:09:51 +0100 Subject: [PATCH 01/16] Configure insert_overwrite models to use parquet (#301) --- .../models_insert_overwrite/insert_overwrite_no_partitions.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql b/tests/integration/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql index fcc142bd0..ae007b45f 100644 --- a/tests/integration/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql +++ b/tests/integration/incremental_strategies/models_insert_overwrite/insert_overwrite_no_partitions.sql @@ -1,6 +1,7 @@ {{ config( materialized = 'incremental', incremental_strategy = 'insert_overwrite', + file_format = 'parquet', ) }} {% if not is_incremental() %} From 5917871eda5e11e220ba1ac7098dbc0214ade675 Mon Sep 17 00:00:00 2001 From: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> Date: Tue, 22 Mar 2022 09:52:36 -0500 Subject: [PATCH 02/16] adding in unique_key as a list for incremental models (#291) * a heavily pulled from dbt-core implementation for bigquery and snowflake version of possible spark answer to unique keys as a list still researching spark some. * adding tests from other adapters over to spark and beginning work on changing over * adding the apache_spark name to first versions of tests to fix error * Pairing * Update config * Try casting data type * More type casting * Fix typo in file name * adding databricks_cluster versions of tests to be in line with rest of integration tests to cover each connection type. * removing duplicated tests and adding changlog addition * trying to get databricks_cluster tests to work by casting date on seed inserts * running tests Co-authored-by: Jeremy Cohen --- CHANGELOG.md | 3 + .../incremental/strategies.sql | 32 +- .../duplicated_unary_unique_key_list.sql | 17 + .../models/empty_str_unique_key.sql | 14 + .../models/empty_unique_key_list.sql | 12 + .../models/expected/one_str__overwrite.sql | 21 + .../unique_key_list__inplace_overwrite.sql | 21 + .../models/no_unique_key.sql | 13 + .../nontyped_trinary_unique_key_list.sql | 19 + .../models/not_found_unique_key.sql | 14 + .../models/not_found_unique_key_list.sql | 8 + .../models/str_unique_key.sql | 17 + .../models/trinary_unique_key_list.sql | 19 + .../models/unary_unique_key_list.sql | 17 + .../seeds/add_new_rows.sql | 9 + .../seeds/duplicate_insert.sql | 5 + .../incremental_unique_id_test/seeds/seed.csv | 7 + .../incremental_unique_id_test/seeds/seed.yml | 7 + .../test_incremental_unique_id.py | 481 ++++++++++++++++++ 19 files changed, 726 insertions(+), 10 deletions(-) create mode 100644 tests/integration/incremental_unique_id_test/models/duplicated_unary_unique_key_list.sql create mode 100644 tests/integration/incremental_unique_id_test/models/empty_str_unique_key.sql create mode 100644 tests/integration/incremental_unique_id_test/models/empty_unique_key_list.sql create mode 100644 tests/integration/incremental_unique_id_test/models/expected/one_str__overwrite.sql create mode 100644 tests/integration/incremental_unique_id_test/models/expected/unique_key_list__inplace_overwrite.sql create mode 100644 tests/integration/incremental_unique_id_test/models/no_unique_key.sql create mode 100644 tests/integration/incremental_unique_id_test/models/nontyped_trinary_unique_key_list.sql create mode 100644 tests/integration/incremental_unique_id_test/models/not_found_unique_key.sql create mode 100644 tests/integration/incremental_unique_id_test/models/not_found_unique_key_list.sql create mode 100644 tests/integration/incremental_unique_id_test/models/str_unique_key.sql create mode 100644 tests/integration/incremental_unique_id_test/models/trinary_unique_key_list.sql create mode 100644 tests/integration/incremental_unique_id_test/models/unary_unique_key_list.sql create mode 100644 tests/integration/incremental_unique_id_test/seeds/add_new_rows.sql create mode 100644 tests/integration/incremental_unique_id_test/seeds/duplicate_insert.sql create mode 100644 tests/integration/incremental_unique_id_test/seeds/seed.csv create mode 100644 tests/integration/incremental_unique_id_test/seeds/seed.yml create mode 100644 tests/integration/incremental_unique_id_test/test_incremental_unique_id.py diff --git a/CHANGELOG.md b/CHANGELOG.md index c2ca4dd95..9642b3a2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## dbt-spark 1.1.0 (Release TBD) +### Features +- Adds new integration test to check against new ability to allow unique_key to be a list. ([#282](https://github.com/dbt-labs/dbt-spark/issues/282)) + ### Under the hood - Update plugin author name (`fishtown-analytics` → `dbt-labs`) in ODBC user agent ([#288](https://github.com/dbt-labs/dbt-spark/pull/288)) diff --git a/dbt/include/spark/macros/materializations/incremental/strategies.sql b/dbt/include/spark/macros/materializations/incremental/strategies.sql index ec5dad67a..215b5f3f9 100644 --- a/dbt/include/spark/macros/materializations/incremental/strategies.sql +++ b/dbt/include/spark/macros/materializations/incremental/strategies.sql @@ -21,20 +21,32 @@ {% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %} {# skip dest_columns, use merge_update_columns config if provided, otherwise use "*" #} + {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set update_columns = config.get("merge_update_columns") -%} + + {% if unique_key %} + {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} + {% for key in unique_key %} + {% set this_key_match %} + DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }} + {% endset %} + {% do predicates.append(this_key_match) %} + {% endfor %} + {% else %} + {% set unique_key_match %} + DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {% endset %} + {% do predicates.append(unique_key_match) %} + {% endif %} + {% else %} + {% do predicates.append('FALSE') %} + {% endif %} - {% set merge_condition %} - {% if unique_key %} - on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} - {% else %} - on false - {% endif %} - {% endset %} + {{ sql_header if sql_header is not none }} - merge into {{ target }} as DBT_INTERNAL_DEST + merge into {{ target }} as DBT_INTERNAL_DEST using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE - - {{ merge_condition }} + on {{ predicates | join(' and ') }} when matched then update set {% if update_columns -%}{%- for column_name in update_columns %} diff --git a/tests/integration/incremental_unique_id_test/models/duplicated_unary_unique_key_list.sql b/tests/integration/incremental_unique_id_test/models/duplicated_unary_unique_key_list.sql new file mode 100644 index 000000000..7290b6c43 --- /dev/null +++ b/tests/integration/incremental_unique_id_test/models/duplicated_unary_unique_key_list.sql @@ -0,0 +1,17 @@ +{{ + config( + materialized='incremental', + unique_key=['state', 'state'] + ) +}} + +select + state as state, + county as county, + city as city, + last_visit_date as last_visit_date +from {{ ref('seed') }} + +{% if is_incremental() %} + where last_visit_date > (select max(last_visit_date) from {{ this }}) +{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/empty_str_unique_key.sql b/tests/integration/incremental_unique_id_test/models/empty_str_unique_key.sql new file mode 100644 index 000000000..5260e177c --- /dev/null +++ b/tests/integration/incremental_unique_id_test/models/empty_str_unique_key.sql @@ -0,0 +1,14 @@ +{{ + config( + materialized='incremental', + unique_key='' + ) +}} + +select + * +from {{ ref('seed') }} + +{% if is_incremental() %} + where last_visit_date > (select max(last_visit_date) from {{ this }}) +{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/empty_unique_key_list.sql b/tests/integration/incremental_unique_id_test/models/empty_unique_key_list.sql new file mode 100644 index 000000000..c582d532c --- /dev/null +++ b/tests/integration/incremental_unique_id_test/models/empty_unique_key_list.sql @@ -0,0 +1,12 @@ +{{ + config( + materialized='incremental', + unique_key=[] + ) +}} + +select * from {{ ref('seed') }} + +{% if is_incremental() %} + where last_visit_date > (select max(last_visit_date) from {{ this }}) +{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/expected/one_str__overwrite.sql b/tests/integration/incremental_unique_id_test/models/expected/one_str__overwrite.sql new file mode 100644 index 000000000..c7101152b --- /dev/null +++ b/tests/integration/incremental_unique_id_test/models/expected/one_str__overwrite.sql @@ -0,0 +1,21 @@ +{{ + config( + materialized='table' + ) +}} + +select + 'CT' as state, + 'Hartford' as county, + 'Hartford' as city, + cast('2022-02-14' as date) as last_visit_date +union all +select 'MA','Suffolk','Boston',cast('2020-02-12' as date) +union all +select 'NJ','Mercer','Trenton',cast('2022-01-01' as date) +union all +select 'NY','Kings','Brooklyn',cast('2021-04-02' as date) +union all +select 'NY','New York','Manhattan',cast('2021-04-01' as date) +union all +select 'PA','Philadelphia','Philadelphia',cast('2021-05-21' as date) \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/expected/unique_key_list__inplace_overwrite.sql b/tests/integration/incremental_unique_id_test/models/expected/unique_key_list__inplace_overwrite.sql new file mode 100644 index 000000000..c7101152b --- /dev/null +++ b/tests/integration/incremental_unique_id_test/models/expected/unique_key_list__inplace_overwrite.sql @@ -0,0 +1,21 @@ +{{ + config( + materialized='table' + ) +}} + +select + 'CT' as state, + 'Hartford' as county, + 'Hartford' as city, + cast('2022-02-14' as date) as last_visit_date +union all +select 'MA','Suffolk','Boston',cast('2020-02-12' as date) +union all +select 'NJ','Mercer','Trenton',cast('2022-01-01' as date) +union all +select 'NY','Kings','Brooklyn',cast('2021-04-02' as date) +union all +select 'NY','New York','Manhattan',cast('2021-04-01' as date) +union all +select 'PA','Philadelphia','Philadelphia',cast('2021-05-21' as date) \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/no_unique_key.sql b/tests/integration/incremental_unique_id_test/models/no_unique_key.sql new file mode 100644 index 000000000..44a63e75c --- /dev/null +++ b/tests/integration/incremental_unique_id_test/models/no_unique_key.sql @@ -0,0 +1,13 @@ +{{ + config( + materialized='incremental' + ) +}} + +select + * +from {{ ref('seed') }} + +{% if is_incremental() %} + where last_visit_date > (select max(last_visit_date) from {{ this }}) +{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/nontyped_trinary_unique_key_list.sql b/tests/integration/incremental_unique_id_test/models/nontyped_trinary_unique_key_list.sql new file mode 100644 index 000000000..52b4509f0 --- /dev/null +++ b/tests/integration/incremental_unique_id_test/models/nontyped_trinary_unique_key_list.sql @@ -0,0 +1,19 @@ +-- for comparing against auto-typed seeds + +{{ + config( + materialized='incremental', + unique_key=['state', 'county', 'city'] + ) +}} + +select + state as state, + county as county, + city as city, + last_visit_date as last_visit_date +from {{ ref('seed') }} + +{% if is_incremental() %} + where last_visit_date > (select max(last_visit_date) from {{ this }}) +{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/not_found_unique_key.sql b/tests/integration/incremental_unique_id_test/models/not_found_unique_key.sql new file mode 100644 index 000000000..d247aa341 --- /dev/null +++ b/tests/integration/incremental_unique_id_test/models/not_found_unique_key.sql @@ -0,0 +1,14 @@ +{{ + config( + materialized='incremental', + unique_key='thisisnotacolumn' + ) +}} + +select + * +from {{ ref('seed') }} + +{% if is_incremental() %} + where last_visit_date > (select max(last_visit_date) from {{ this }}) +{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/not_found_unique_key_list.sql b/tests/integration/incremental_unique_id_test/models/not_found_unique_key_list.sql new file mode 100644 index 000000000..f1462a48f --- /dev/null +++ b/tests/integration/incremental_unique_id_test/models/not_found_unique_key_list.sql @@ -0,0 +1,8 @@ +{{ + config( + materialized='incremental', + unique_key=['state', 'thisisnotacolumn'] + ) +}} + +select * from {{ ref('seed') }} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/str_unique_key.sql b/tests/integration/incremental_unique_id_test/models/str_unique_key.sql new file mode 100644 index 000000000..2f9fc2987 --- /dev/null +++ b/tests/integration/incremental_unique_id_test/models/str_unique_key.sql @@ -0,0 +1,17 @@ +{{ + config( + materialized='incremental', + unique_key='state' + ) +}} + +select + state as state, + county as county, + city as city, + last_visit_date as last_visit_date +from {{ ref('seed') }} + +{% if is_incremental() %} + where last_visit_date > (select max(last_visit_date) from {{ this }}) +{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/trinary_unique_key_list.sql b/tests/integration/incremental_unique_id_test/models/trinary_unique_key_list.sql new file mode 100644 index 000000000..0359546bf --- /dev/null +++ b/tests/integration/incremental_unique_id_test/models/trinary_unique_key_list.sql @@ -0,0 +1,19 @@ +-- types needed to compare against expected model reliably + +{{ + config( + materialized='incremental', + unique_key=['state', 'county', 'city'] + ) +}} + +select + state as state, + county as county, + city as city, + last_visit_date as last_visit_date +from {{ ref('seed') }} + +{% if is_incremental() %} + where last_visit_date > (select max(last_visit_date) from {{ this }}) +{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/unary_unique_key_list.sql b/tests/integration/incremental_unique_id_test/models/unary_unique_key_list.sql new file mode 100644 index 000000000..7f5875f85 --- /dev/null +++ b/tests/integration/incremental_unique_id_test/models/unary_unique_key_list.sql @@ -0,0 +1,17 @@ +{{ + config( + materialized='incremental', + unique_key=['state'] + ) +}} + +select + state as state, + county as county, + city as city, + last_visit_date as last_visit_date +from {{ ref('seed') }} + +{% if is_incremental() %} + where last_visit_date > (select max(last_visit_date) from {{ this }}) +{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/seeds/add_new_rows.sql b/tests/integration/incremental_unique_id_test/seeds/add_new_rows.sql new file mode 100644 index 000000000..e5611fe32 --- /dev/null +++ b/tests/integration/incremental_unique_id_test/seeds/add_new_rows.sql @@ -0,0 +1,9 @@ +-- insert two new rows, both of which should be in incremental model +-- with any unique columns +insert into {schema}.seed + (state, county, city, last_visit_date) +values ('WA','King','Seattle',cast('2022-02-01' as date)); + +insert into {schema}.seed + (state, county, city, last_visit_date) +values ('CA','Los Angeles','Los Angeles',cast('2022-02-01' as date)); \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/seeds/duplicate_insert.sql b/tests/integration/incremental_unique_id_test/seeds/duplicate_insert.sql new file mode 100644 index 000000000..8abe2808f --- /dev/null +++ b/tests/integration/incremental_unique_id_test/seeds/duplicate_insert.sql @@ -0,0 +1,5 @@ +-- insert new row, which should not be in incremental model +-- with primary or first three columns unique +insert into {schema}.seed + (state, county, city, last_visit_date) +values ('CT','Hartford','Hartford',cast('2022-02-14' as date)); \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/seeds/seed.csv b/tests/integration/incremental_unique_id_test/seeds/seed.csv new file mode 100644 index 000000000..b988827fb --- /dev/null +++ b/tests/integration/incremental_unique_id_test/seeds/seed.csv @@ -0,0 +1,7 @@ +state,county,city,last_visit_date +CT,Hartford,Hartford,2020-09-23 +MA,Suffolk,Boston,2020-02-12 +NJ,Mercer,Trenton,2022-01-01 +NY,Kings,Brooklyn,2021-04-02 +NY,New York,Manhattan,2021-04-01 +PA,Philadelphia,Philadelphia,2021-05-21 \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/seeds/seed.yml b/tests/integration/incremental_unique_id_test/seeds/seed.yml new file mode 100644 index 000000000..c048548a8 --- /dev/null +++ b/tests/integration/incremental_unique_id_test/seeds/seed.yml @@ -0,0 +1,7 @@ +version: 2 + +seeds: + - name: seed + config: + column_types: + last_visit_date: date diff --git a/tests/integration/incremental_unique_id_test/test_incremental_unique_id.py b/tests/integration/incremental_unique_id_test/test_incremental_unique_id.py new file mode 100644 index 000000000..6ba80bc75 --- /dev/null +++ b/tests/integration/incremental_unique_id_test/test_incremental_unique_id.py @@ -0,0 +1,481 @@ +from tests.integration.base import DBTIntegrationTest, use_profile +from dbt.contracts.results import RunStatus +from collections import namedtuple +from pathlib import Path + + +TestResults = namedtuple( + 'TestResults', + ['seed_count', 'model_count', 'seed_rows', 'inc_test_model_count', + 'opt_model_count', 'relation'], +) + + +class TestIncrementalUniqueKey(DBTIntegrationTest): + @property + def schema(self): + return 'incremental_unique_key' + + @property + def models(self): + return 'models' + + @property + def project_config(self): + return { + "config-version": 2, + "models": { + "+file_format": "delta", + "+incremental_strategy": "merge" + } + } + + def update_incremental_model(self, incremental_model): + '''update incremental model after the seed table has been updated''' + model_result_set = self.run_dbt(['run', '--select', incremental_model]) + return len(model_result_set) + + def setup_test(self, seed, incremental_model, update_sql_file): + '''build a test case and return values for assertions''' + + # Idempotently create some number of seeds and incremental models + seed_count = len(self.run_dbt( + ['seed', '--select', seed, '--full-refresh'] + )) + model_count = len(self.run_dbt( + ['run', '--select', incremental_model, '--full-refresh'] + )) + + # Upate seed and return new row count + row_count_query = 'select * from {}.{}'.format( + self.unique_schema(), + seed + ) + self.run_sql_file(Path('seeds') / Path(update_sql_file + '.sql')) + seed_rows = len(self.run_sql(row_count_query, fetch='all')) + + inc_test_model_count = self.update_incremental_model( + incremental_model=incremental_model + ) + + return (seed_count, model_count, seed_rows, inc_test_model_count) + + def test_scenario_correctness(self, expected_fields, test_case_fields): + '''Invoke assertions to verify correct build functionality''' + # 1. test seed(s) should build afresh + self.assertEqual( + expected_fields.seed_count, test_case_fields.seed_count + ) + # 2. test model(s) should build afresh + self.assertEqual( + expected_fields.model_count, test_case_fields.model_count + ) + # 3. seeds should have intended row counts post update + self.assertEqual( + expected_fields.seed_rows, test_case_fields.seed_rows + ) + # 4. incremental test model(s) should be updated + self.assertEqual( + expected_fields.inc_test_model_count, + test_case_fields.inc_test_model_count + ) + # 5. extra incremental model(s) should be built; optional since + # comparison may be between an incremental model and seed + if (expected_fields.opt_model_count and + test_case_fields.opt_model_count): + self.assertEqual( + expected_fields.opt_model_count, + test_case_fields.opt_model_count + ) + # 6. result table should match intended result set (itself a relation) + self.assertTablesEqual( + expected_fields.relation, test_case_fields.relation + ) + + def stub_expected_fields( + self, relation, seed_rows, opt_model_count=None + ): + return TestResults( + seed_count=1, model_count=1, seed_rows=seed_rows, + inc_test_model_count=1, opt_model_count=opt_model_count, + relation=relation + ) + + def fail_to_build_inc_missing_unique_key_column(self, incremental_model_name): + '''should pass back error state when trying build an incremental + model whose unique key or keylist includes a column missing + from the incremental model''' + seed_count = len(self.run_dbt( + ['seed', '--select', 'seed', '--full-refresh'] + )) + # unique keys are not applied on first run, so two are needed + self.run_dbt( + ['run', '--select', incremental_model_name, '--full-refresh'], + expect_pass=True + ) + run_result = self.run_dbt( + ['run', '--select', incremental_model_name], + expect_pass=False + ).results[0] + + return run_result.status, run_result.message + + +class TestNoIncrementalUniqueKey(TestIncrementalUniqueKey): + + @use_profile("databricks_sql_endpoint") + def test__databricks_sql_endpoint_no_unique_keys(self): + '''with no unique keys, seed and model should match''' + seed='seed' + seed_rows=8 + incremental_model='no_unique_key' + update_sql_file='add_new_rows' + + expected_fields = self.stub_expected_fields( + relation=seed, seed_rows=seed_rows + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=None, relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile("databricks_cluster") + def test__databricks_cluster_no_unique_keys(self): + '''with no unique keys, seed and model should match''' + seed='seed' + seed_rows=8 + incremental_model='no_unique_key' + update_sql_file='add_new_rows' + + expected_fields = self.stub_expected_fields( + relation=seed, seed_rows=seed_rows + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=None, relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + +class TestIncrementalStrUniqueKey(TestIncrementalUniqueKey): + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint_empty_str_unique_key(self): + '''with empty string for unique key, seed and model should match''' + seed='seed' + seed_rows=8 + incremental_model='empty_str_unique_key' + update_sql_file='add_new_rows' + + expected_fields = self.stub_expected_fields( + relation=seed, seed_rows=seed_rows + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=None, relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile('databricks_cluster') + def test__databricks_cluster_empty_str_unique_key(self): + '''with empty string for unique key, seed and model should match''' + seed='seed' + seed_rows=8 + incremental_model='empty_str_unique_key' + update_sql_file='add_new_rows' + + expected_fields = self.stub_expected_fields( + relation=seed, seed_rows=seed_rows + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=None, relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint_one_unique_key(self): + '''with one unique key, model will overwrite existing row''' + seed='seed' + seed_rows=7 + incremental_model='str_unique_key' + update_sql_file='duplicate_insert' + expected_model='one_str__overwrite' + + expected_fields = self.stub_expected_fields( + relation=expected_model, seed_rows=seed_rows, opt_model_count=1 + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=self.update_incremental_model(expected_model), + relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile('databricks_cluster') + def test__databricks_cluster_one_unique_key(self): + '''with one unique key, model will overwrite existing row''' + seed='seed' + seed_rows=7 + incremental_model='str_unique_key' + update_sql_file='duplicate_insert' + expected_model='one_str__overwrite' + + expected_fields = self.stub_expected_fields( + relation=expected_model, seed_rows=seed_rows, opt_model_count=1 + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=self.update_incremental_model(expected_model), + relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint_bad_unique_key(self): + '''expect compilation error from unique key not being a column''' + + (status, exc) = self.fail_to_build_inc_missing_unique_key_column( + incremental_model_name='not_found_unique_key' + ) + + self.assertEqual(status, RunStatus.Error) + self.assertTrue("thisisnotacolumn" in exc) + + @use_profile('databricks_cluster') + def test__databricks_cluster_bad_unique_key(self): + '''expect compilation error from unique key not being a column''' + + (status, exc) = self.fail_to_build_inc_missing_unique_key_column( + incremental_model_name='not_found_unique_key' + ) + + self.assertEqual(status, RunStatus.Error) + self.assertTrue("thisisnotacolumn" in exc) + + +class TestIncrementalListUniqueKey(TestIncrementalUniqueKey): + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint_empty_unique_key_list(self): + '''with no unique keys, seed and model should match''' + seed='seed' + seed_rows=8 + incremental_model='empty_unique_key_list' + update_sql_file='add_new_rows' + + expected_fields = self.stub_expected_fields( + relation=seed, seed_rows=seed_rows + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=None, relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile('databricks_cluster') + def test__databricks_cluster_empty_unique_key_list(self): + '''with no unique keys, seed and model should match''' + seed='seed' + seed_rows=8 + incremental_model='empty_unique_key_list' + update_sql_file='add_new_rows' + + expected_fields = self.stub_expected_fields( + relation=seed, seed_rows=seed_rows + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=None, relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint_unary_unique_key_list(self): + '''with one unique key, model will overwrite existing row''' + seed='seed' + seed_rows=7 + incremental_model='unary_unique_key_list' + update_sql_file='duplicate_insert' + expected_model='unique_key_list__inplace_overwrite' + + expected_fields = self.stub_expected_fields( + relation=expected_model, seed_rows=seed_rows, opt_model_count=1 + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=self.update_incremental_model(expected_model), + relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile('databricks_cluster') + def test__databricks_cluster_unary_unique_key_list(self): + '''with one unique key, model will overwrite existing row''' + seed='seed' + seed_rows=7 + incremental_model='unary_unique_key_list' + update_sql_file='duplicate_insert' + expected_model='unique_key_list__inplace_overwrite' + + expected_fields = self.stub_expected_fields( + relation=expected_model, seed_rows=seed_rows, opt_model_count=1 + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=self.update_incremental_model(expected_model), + relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint_duplicated_unary_unique_key_list(self): + '''with two of the same unique key, model will overwrite existing row''' + seed='seed' + seed_rows=7 + incremental_model='duplicated_unary_unique_key_list' + update_sql_file='duplicate_insert' + expected_model='unique_key_list__inplace_overwrite' + + expected_fields = self.stub_expected_fields( + relation=expected_model, seed_rows=seed_rows, opt_model_count=1 + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=self.update_incremental_model(expected_model), + relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile('databricks_cluster') + def test__databricks_cluster_duplicated_unary_unique_key_list(self): + '''with two of the same unique key, model will overwrite existing row''' + seed='seed' + seed_rows=7 + incremental_model='duplicated_unary_unique_key_list' + update_sql_file='duplicate_insert' + expected_model='unique_key_list__inplace_overwrite' + + expected_fields = self.stub_expected_fields( + relation=expected_model, seed_rows=seed_rows, opt_model_count=1 + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=self.update_incremental_model(expected_model), + relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint_trinary_unique_key_list(self): + '''with three unique keys, model will overwrite existing row''' + seed='seed' + seed_rows=7 + incremental_model='trinary_unique_key_list' + update_sql_file='duplicate_insert' + expected_model='unique_key_list__inplace_overwrite' + + expected_fields = self.stub_expected_fields( + relation=expected_model, seed_rows=seed_rows, opt_model_count=1 + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=self.update_incremental_model(expected_model), + relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile('databricks_cluster') + def test__databricks_cluster_trinary_unique_key_list(self): + '''with three unique keys, model will overwrite existing row''' + seed='seed' + seed_rows=7 + incremental_model='trinary_unique_key_list' + update_sql_file='duplicate_insert' + expected_model='unique_key_list__inplace_overwrite' + + expected_fields = self.stub_expected_fields( + relation=expected_model, seed_rows=seed_rows, opt_model_count=1 + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=self.update_incremental_model(expected_model), + relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint_trinary_unique_key_list_no_update(self): + '''even with three unique keys, adding distinct rows to seed does not + cause seed and model to diverge''' + seed='seed' + seed_rows=8 + incremental_model='nontyped_trinary_unique_key_list' + update_sql_file='add_new_rows' + + expected_fields = self.stub_expected_fields( + relation=seed, seed_rows=seed_rows + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=None, relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile('databricks_cluster') + def test__databricks_cluster_trinary_unique_key_list_no_update(self): + '''even with three unique keys, adding distinct rows to seed does not + cause seed and model to diverge''' + seed='seed' + seed_rows=8 + incremental_model='nontyped_trinary_unique_key_list' + update_sql_file='add_new_rows' + + expected_fields = self.stub_expected_fields( + relation=seed, seed_rows=seed_rows + ) + test_case_fields = TestResults( + *self.setup_test(seed, incremental_model, update_sql_file), + opt_model_count=None, relation=incremental_model + ) + + self.test_scenario_correctness(expected_fields, test_case_fields) + + @use_profile('databricks_sql_endpoint') + def test__databricks_sql_endpoint_bad_unique_key_list(self): + '''expect compilation error from unique key not being a column''' + + (status, exc) = self.fail_to_build_inc_missing_unique_key_column( + incremental_model_name='not_found_unique_key_list' + ) + + self.assertEqual(status, RunStatus.Error) + self.assertTrue("thisisnotacolumn" in exc) + + @use_profile('databricks_cluster') + def test__databricks_cluster_bad_unique_key_list(self): + '''expect compilation error from unique key not being a column''' + + (status, exc) = self.fail_to_build_inc_missing_unique_key_column( + incremental_model_name='not_found_unique_key_list' + ) + + self.assertEqual(status, RunStatus.Error) + self.assertTrue("thisisnotacolumn" in exc) + \ No newline at end of file From ee5276561d55e5eedfbe91915a9cab50dbff956a Mon Sep 17 00:00:00 2001 From: leahwicz <60146280+leahwicz@users.noreply.github.com> Date: Wed, 23 Mar 2022 15:58:40 -0500 Subject: [PATCH 03/16] Add sasl dependency (#303) --- .github/workflows/version-bump.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/version-bump.yml b/.github/workflows/version-bump.yml index 4913a6e84..7fb8bb6eb 100644 --- a/.github/workflows/version-bump.yml +++ b/.github/workflows/version-bump.yml @@ -55,6 +55,7 @@ jobs: - name: Install python dependencies run: | + sudo apt-get install libsasl2-dev python3 -m venv env source env/bin/activate pip install --upgrade pip From e9d5bfb3dc52bc85ab740d0267d9f86ed7e671a1 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 23 Mar 2022 17:47:56 -0400 Subject: [PATCH 04/16] Bumping version to 1.1.0b1 (#304) * Bumping version to 1.1.0b1 * Update CHANGELOG.md * Updating changelog release date Co-authored-by: Github Build Bot Co-authored-by: leahwicz <60146280+leahwicz@users.noreply.github.com> --- .bumpversion.cfg | 3 ++- CHANGELOG.md | 11 +++++++++-- dbt/adapters/spark/__version__.py | 2 +- setup.py | 2 +- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 9db331eb9..9a0c41a56 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.0.0 +current_version = 1.1.0b1 parse = (?P\d+) \.(?P\d+) \.(?P\d+) @@ -25,3 +25,4 @@ first_value = 1 [bumpversion:file:setup.py] [bumpversion:file:dbt/adapters/spark/__version__.py] + diff --git a/CHANGELOG.md b/CHANGELOG.md index 9642b3a2a..1a149622a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,13 +1,20 @@ -## dbt-spark 1.1.0 (Release TBD) +## dbt-spark 1.1.0b1 (March 23, 2022) ### Features -- Adds new integration test to check against new ability to allow unique_key to be a list. ([#282](https://github.com/dbt-labs/dbt-spark/issues/282)) +- Adds new integration test to check against new ability to allow unique_key to be a list. ([#282](https://github.com/dbt-labs/dbt-spark/issues/282)), [#291](https://github.com/dbt-labs/dbt-spark/pull/291)) + +### Fixes +- Closes the connection properly ([#280](https://github.com/dbt-labs/dbt-spark/issues/280), [#285](https://github.com/dbt-labs/dbt-spark/pull/285)) ### Under the hood +- get_response -> AdapterResponse ([#265](https://github.com/dbt-labs/dbt-spark/pull/265)) +- Adding stale Actions workflow ([#275](https://github.com/dbt-labs/dbt-spark/pull/275)) - Update plugin author name (`fishtown-analytics` → `dbt-labs`) in ODBC user agent ([#288](https://github.com/dbt-labs/dbt-spark/pull/288)) +- Configure insert_overwrite models to use parquet ([#301](https://github.com/dbt-labs/dbt-spark/pull/301)) ### Contributors - [@amychen1776](https://github.com/amychen1776) ([#288](https://github.com/dbt-labs/dbt-spark/pull/288)) +- [@ueshin](https://github.com/ueshin) ([#285](https://github.com/dbt-labs/dbt-spark/pull/285)) ## dbt-spark 1.0.1rc0 (Release TBD) diff --git a/dbt/adapters/spark/__version__.py b/dbt/adapters/spark/__version__.py index 11a716ec1..56ec17a89 100644 --- a/dbt/adapters/spark/__version__.py +++ b/dbt/adapters/spark/__version__.py @@ -1 +1 @@ -version = "1.0.0" +version = "1.1.0b1" diff --git a/setup.py b/setup.py index fdf8691a6..6ee6f5f54 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ def _get_dbt_core_version(): package_name = "dbt-spark" -package_version = "1.0.0" +package_version = "1.1.0b1" dbt_core_version = _get_dbt_core_version() description = """The Apache Spark adapter plugin for dbt""" From 086becba09ac1381dab2bf31fca4763fe6c3f74e Mon Sep 17 00:00:00 2001 From: Cor Date: Sat, 26 Mar 2022 15:26:37 +0000 Subject: [PATCH 05/16] Add spark session connection (#279) * Add session module * Add session connection method * Add session extras to setup.py * Add check for session method * Add session connection wrapper * Add sessioin to connection manager * Remove unused imports * Add spark session dbtspec * Add tox spark session environment * Add missing settings to dbt spec * Install session requirements * Add tox spark session to circle ci * Add pytest spark as test requirement * Add fixutre to force use spark session * Add pytest ini * Update passenv in tox * Set catalog implementation to hive * Make separate session connection wrapper * Format parameters * Run spark session before thrift * Add spark to dev requirements * Fix session module * Bump Spark session python version * Change docker image for spark session * Install python3 * Update ci * Remove spark fixture * Move session connection wrapper to session module * Disable tests that require hive support * Format * Change python 3 to python 3.8 * Install non-python dependencies * Remove dev-requirements * Remove pytest ini * Update the install * Add session method to change log * Do not pin sasl version * Delete spark session test profile * Add postgres container for hive support * Enable all session tests * Enable hive support * Add delta as file format * Use equals in spark defaults * Change reference to find spark home * Copy configs in one go * List spark conf * Let session test be the same as thrift * Update spark defaults * Enable error logging on postgres * Remove ls * Add port to connection url * Do not copy spark config * Print postgres * Remove postgres logging from thrift * Remove postgres from spark session tests * Change connection url back to dbt-hive-metastore * Revert Spark defaults changes * Disable tests and explain why * Move change log to top of file * Move contributor note up in changelog Co-authored-by: Jeremy Cohen --- .circleci/config.yml | 23 +++- CHANGELOG.md | 6 + dbt/adapters/spark/connections.py | 19 +++ dbt/adapters/spark/session.py | 221 ++++++++++++++++++++++++++++++ dev_requirements.txt | 2 +- setup.py | 10 +- tests/integration/conftest.py | 12 +- tests/specs/spark-session.dbtspec | 17 +++ tox.ini | 12 ++ 9 files changed, 308 insertions(+), 14 deletions(-) create mode 100644 dbt/adapters/spark/session.py create mode 100644 tests/specs/spark-session.dbtspec diff --git a/.circleci/config.yml b/.circleci/config.yml index 4921fac98..135c22cd3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -10,6 +10,24 @@ jobs: - checkout - run: tox -e flake8,unit + integration-spark-session: + environment: + DBT_INVOCATION_ENV: circle + docker: + - image: godatadriven/pyspark:3.1 + steps: + - checkout + - run: apt-get update + - run: python3 -m pip install --upgrade pip + - run: apt-get install -y git gcc g++ unixodbc-dev libsasl2-dev + - run: python3 -m pip install tox + - run: + name: Run integration tests + command: tox -e integration-spark-session + no_output_timeout: 1h + - store_artifacts: + path: ./logs + integration-spark-thrift: environment: DBT_INVOCATION_ENV: circle @@ -90,7 +108,7 @@ jobs: no_output_timeout: 1h - store_artifacts: path: ./logs - + integration-spark-databricks-odbc-endpoint: <<: *databricks-odbc steps: @@ -107,6 +125,9 @@ workflows: test-everything: jobs: - unit + - integration-spark-session: + requires: + - unit - integration-spark-thrift: requires: - unit diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a149622a..2b2e8f977 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +### Features +- Add session connection method ([#272](https://github.com/dbt-labs/dbt-spark/issues/272), [#279](https://github.com/dbt-labs/dbt-spark/pull/279)) + +### Contributors +- [@JCZuurmond](https://github.com/dbt-labs/dbt-spark/pull/279) ( [#279](https://github.com/dbt-labs/dbt-spark/pull/279)) + ## dbt-spark 1.1.0b1 (March 23, 2022) ### Features diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index 608ab2b45..11163ccf0 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -55,6 +55,7 @@ class SparkConnectionMethod(StrEnum): THRIFT = 'thrift' HTTP = 'http' ODBC = 'odbc' + SESSION = 'session' @dataclass @@ -133,6 +134,18 @@ def __post_init__(self): "`pip install dbt-spark[PyHive]`" ) + if self.method == SparkConnectionMethod.SESSION: + try: + import pyspark # noqa: F401 + except ImportError as e: + raise dbt.exceptions.RuntimeException( + f"{self.method} connection method requires " + "additional dependencies. \n" + "Install the additional required dependencies with " + "`pip install dbt-spark[session]`\n\n" + f"ImportError({e.msg})" + ) from e + @property def type(self): return 'spark' @@ -443,6 +456,12 @@ def open(cls, connection): conn = pyodbc.connect(connection_str, autocommit=True) handle = PyodbcConnectionWrapper(conn) + elif creds.method == SparkConnectionMethod.SESSION: + from .session import ( # noqa: F401 + Connection, + SessionConnectionWrapper, + ) + handle = SessionConnectionWrapper(Connection()) else: raise dbt.exceptions.DbtProfileError( f"invalid credential method: {creds.method}" diff --git a/dbt/adapters/spark/session.py b/dbt/adapters/spark/session.py new file mode 100644 index 000000000..6010df920 --- /dev/null +++ b/dbt/adapters/spark/session.py @@ -0,0 +1,221 @@ +"""Spark session integration.""" + +from __future__ import annotations + +import datetime as dt +from types import TracebackType +from typing import Any + +from dbt.events import AdapterLogger +from dbt.utils import DECIMALS +from pyspark.sql import DataFrame, Row, SparkSession + + +logger = AdapterLogger("Spark") +NUMBERS = DECIMALS + (int, float) + + +class Cursor: + """ + Mock a pyodbc cursor. + + Source + ------ + https://github.com/mkleehammer/pyodbc/wiki/Cursor + """ + + def __init__(self) -> None: + self._df: DataFrame | None = None + self._rows: list[Row] | None = None + + def __enter__(self) -> Cursor: + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: Exception | None, + exc_tb: TracebackType | None, + ) -> bool: + self.close() + return True + + @property + def description( + self, + ) -> list[tuple[str, str, None, None, None, None, bool]]: + """ + Get the description. + + Returns + ------- + out : list[tuple[str, str, None, None, None, None, bool]] + The description. + + Source + ------ + https://github.com/mkleehammer/pyodbc/wiki/Cursor#description + """ + if self._df is None: + description = list() + else: + description = [ + ( + field.name, + field.dataType.simpleString(), + None, + None, + None, + None, + field.nullable, + ) + for field in self._df.schema.fields + ] + return description + + def close(self) -> None: + """ + Close the connection. + + Source + ------ + https://github.com/mkleehammer/pyodbc/wiki/Cursor#close + """ + self._df = None + self._rows = None + + def execute(self, sql: str, *parameters: Any) -> None: + """ + Execute a sql statement. + + Parameters + ---------- + sql : str + Execute a sql statement. + *parameters : Any + The parameters. + + Raises + ------ + NotImplementedError + If there are parameters given. We do not format sql statements. + + Source + ------ + https://github.com/mkleehammer/pyodbc/wiki/Cursor#executesql-parameters + """ + if len(parameters) > 0: + sql = sql % parameters + spark_session = SparkSession.builder.enableHiveSupport().getOrCreate() + self._df = spark_session.sql(sql) + + def fetchall(self) -> list[Row] | None: + """ + Fetch all data. + + Returns + ------- + out : list[Row] | None + The rows. + + Source + ------ + https://github.com/mkleehammer/pyodbc/wiki/Cursor#fetchall + """ + if self._rows is None and self._df is not None: + self._rows = self._df.collect() + return self._rows + + def fetchone(self) -> Row | None: + """ + Fetch the first output. + + Returns + ------- + out : Row | None + The first row. + + Source + ------ + https://github.com/mkleehammer/pyodbc/wiki/Cursor#fetchone + """ + if self._rows is None and self._df is not None: + self._rows = self._df.collect() + + if self._rows is not None and len(self._rows) > 0: + row = self._rows.pop(0) + else: + row = None + + return row + + +class Connection: + """ + Mock a pyodbc connection. + + Source + ------ + https://github.com/mkleehammer/pyodbc/wiki/Connection + """ + + def cursor(self) -> Cursor: + """ + Get a cursor. + + Returns + ------- + out : Cursor + The cursor. + """ + return Cursor() + + +class SessionConnectionWrapper(object): + """Connection wrapper for the sessoin connection method.""" + + def __init__(self, handle): + self.handle = handle + self._cursor = None + + def cursor(self): + self._cursor = self.handle.cursor() + return self + + def cancel(self): + logger.debug("NotImplemented: cancel") + + def close(self): + if self._cursor: + self._cursor.close() + + def rollback(self, *args, **kwargs): + logger.debug("NotImplemented: rollback") + + def fetchall(self): + return self._cursor.fetchall() + + def execute(self, sql, bindings=None): + if sql.strip().endswith(";"): + sql = sql.strip()[:-1] + + if bindings is None: + self._cursor.execute(sql) + else: + bindings = [self._fix_binding(binding) for binding in bindings] + self._cursor.execute(sql, *bindings) + + @property + def description(self): + return self._cursor.description + + @classmethod + def _fix_binding(cls, value): + """Convert complex datatypes to primitives that can be loaded by + the Spark driver""" + if isinstance(value, NUMBERS): + return float(value) + elif isinstance(value, dt.datetime): + return f"'{value.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}'" + else: + return f"'{value}'" diff --git a/dev_requirements.txt b/dev_requirements.txt index 9b371f9c6..9b4e8a12f 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -16,5 +16,5 @@ pytest-csv # Test requirements pytest-dbt-adapter==0.6.0 -sasl==0.2.1 +sasl>=0.2.1 thrift_sasl==0.4.1 diff --git a/setup.py b/setup.py index 6ee6f5f54..2cd44491e 100644 --- a/setup.py +++ b/setup.py @@ -61,7 +61,10 @@ def _get_dbt_core_version(): 'PyHive[hive]>=0.6.0,<0.7.0', 'thrift>=0.11.0,<0.16.0', ] -all_extras = odbc_extras + pyhive_extras +session_extras = [ + "pyspark>=3.0.0,<4.0.0" +] +all_extras = odbc_extras + pyhive_extras + session_extras setup( name=package_name, @@ -83,8 +86,9 @@ def _get_dbt_core_version(): ], extras_require={ "ODBC": odbc_extras, - "PyHive": pyhive_extras, - "all": all_extras + "PyHive": pyhive_extras, + "session": session_extras, + "all": all_extras, }, zip_safe=False, classifiers=[ diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 02248bae3..b76bc6c31 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,10 +1,4 @@ def pytest_configure(config): - config.addinivalue_line( - "markers", "profile_databricks_cluster" - ) - config.addinivalue_line( - "markers", "profile_databricks_sql_endpoint" - ) - config.addinivalue_line( - "markers", "profile_apache_spark" - ) + config.addinivalue_line("markers", "profile_databricks_cluster") + config.addinivalue_line("markers", "profile_databricks_sql_endpoint") + config.addinivalue_line("markers", "profile_apache_spark") diff --git a/tests/specs/spark-session.dbtspec b/tests/specs/spark-session.dbtspec new file mode 100644 index 000000000..cd09aa178 --- /dev/null +++ b/tests/specs/spark-session.dbtspec @@ -0,0 +1,17 @@ +target: + type: spark + method: session + host: localhost + schema: "analytics_{{ var('_dbt_random_suffix') }}" +sequences: + test_dbt_empty: empty + # requires a metastore for persisting over dbt runs + # test_dbt_base: base + # test_dbt_ephemeral: ephemeral + # test_dbt_incremental: incremental + # snapshots require delta format + # test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp + # test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols + test_dbt_data_test: data_test + test_dbt_schema_test: schema_test + test_dbt_ephemeral_data_tests: data_test_ephemeral_models diff --git a/tox.ini b/tox.ini index 993ab801e..e896421e2 100644 --- a/tox.ini +++ b/tox.ini @@ -57,3 +57,15 @@ deps = -r{toxinidir}/requirements.txt -r{toxinidir}/dev_requirements.txt -e. + +[testenv:integration-spark-session] +basepython = python3 +commands = /bin/bash -c '{envpython} -m pytest -v tests/specs/spark-session.dbtspec' +passenv = + DBT_* + PYTEST_* + PIP_CACHE_DIR +deps = + -r{toxinidir}/requirements.txt + -r{toxinidir}/dev_requirements.txt + -e.[session] From bbff5c72693565f1e1e2b0e821186d0b73c49ae3 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Tue, 29 Mar 2022 17:16:44 -0400 Subject: [PATCH 06/16] Initial implementation of new testing framework (#299) --- CHANGELOG.md | 5 ++ dev_requirements.txt | 3 +- pytest.ini | 10 +++ tests/conftest.py | 90 +++++++++++++++++++ tests/functional/adapter/test_basic.py | 79 ++++++++++++++++ tests/integration/base.py | 2 + .../test_incremental_unique_id.py | 40 ++++----- tests/specs/spark-databricks-http.dbtspec | 32 ------- .../spark-databricks-odbc-cluster.dbtspec | 33 ------- ...spark-databricks-odbc-sql-endpoint.dbtspec | 35 -------- tests/specs/spark-thrift.dbtspec | 22 ----- tox.ini | 8 +- 12 files changed, 212 insertions(+), 147 deletions(-) create mode 100644 pytest.ini create mode 100644 tests/conftest.py create mode 100644 tests/functional/adapter/test_basic.py delete mode 100644 tests/specs/spark-databricks-http.dbtspec delete mode 100644 tests/specs/spark-databricks-odbc-cluster.dbtspec delete mode 100644 tests/specs/spark-databricks-odbc-sql-endpoint.dbtspec delete mode 100644 tests/specs/spark-thrift.dbtspec diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b2e8f977..e61a335cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ +## dbt-spark 1.1.0 (TBD) + ### Features - Add session connection method ([#272](https://github.com/dbt-labs/dbt-spark/issues/272), [#279](https://github.com/dbt-labs/dbt-spark/pull/279)) +### Under the hood +- Use dbt.tests.adapter.basic in test suite ([#298](https://github.com/dbt-labs/dbt-spark/issues/298), [#299](https://github.com/dbt-labs/dbt-spark/pull/299)) + ### Contributors - [@JCZuurmond](https://github.com/dbt-labs/dbt-spark/pull/279) ( [#279](https://github.com/dbt-labs/dbt-spark/pull/279)) diff --git a/dev_requirements.txt b/dev_requirements.txt index 9b4e8a12f..520d1f5b8 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,6 +1,7 @@ # install latest changes in dbt-core # TODO: how to automate switching from develop to version branches? -git+https://github.com/dbt-labs/dbt.git#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter freezegun==0.3.9 pytest==6.0.2 diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 000000000..b04a6ccf3 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,10 @@ +[pytest] +filterwarnings = + ignore:.*'soft_unicode' has been renamed to 'soft_str'*:DeprecationWarning + ignore:unclosed file .*:ResourceWarning +env_files = + test.env +testpaths = + tests/unit + tests/integration + tests/functional diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 000000000..603dc1391 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,90 @@ +import pytest +import os + +pytest_plugins = ["dbt.tests.fixtures.project"] + + +def pytest_addoption(parser): + parser.addoption("--profile", action="store", default="apache_spark", type=str) + + +# Using @pytest.mark.skip_adapter('apache_spark') uses the 'skip_by_adapter_type' +# autouse fixture below +def pytest_configure(config): + config.addinivalue_line( + "markers", + "skip_profile(profile): skip test for the given profile", + ) + + +@pytest.fixture(scope="session") +def dbt_profile_target(request): + profile_type = request.config.getoption("--profile") + if profile_type == "databricks_cluster": + target = databricks_cluster_target() + elif profile_type == "databricks_sql_endpoint": + target = databricks_sql_endpoint_target() + elif profile_type == "apache_spark": + target = apache_spark_target() + elif profile_type == "databricks_http_cluster": + target = databricks_http_cluster_target() + else: + raise ValueError(f"Invalid profile type '{profile_type}'") + return target + + +def apache_spark_target(): + return { + "type": "spark", + "host": "localhost", + "user": "dbt", + "method": "thrift", + "port": 10000, + "connect_retries": 5, + "connect_timeout": 60, + "retry_all": True, + } + + +def databricks_cluster_target(): + return { + "type": "spark", + "method": "odbc", + "host": os.getenv("DBT_DATABRICKS_HOST_NAME"), + "cluster": os.getenv("DBT_DATABRICKS_CLUSTER_NAME"), + "token": os.getenv("DBT_DATABRICKS_TOKEN"), + "driver": os.getenv("ODBC_DRIVER"), + "port": 443, + } + + +def databricks_sql_endpoint_target(): + return { + "type": "spark", + "method": "odbc", + "host": os.getenv("DBT_DATABRICKS_HOST_NAME"), + "endpoint": os.getenv("DBT_DATABRICKS_ENDPOINT"), + "token": os.getenv("DBT_DATABRICKS_TOKEN"), + "driver": os.getenv("ODBC_DRIVER"), + "port": 443, + } + + +def databricks_http_cluster_target(): + return { + "type": "spark", + "host": os.getenv('DBT_DATABRICKS_HOST_NAME'), + "cluster": os.getenv('DBT_DATABRICKS_CLUSTER_NAME'), + "token": os.getenv('DBT_DATABRICKS_TOKEN'), + "method": "http", + "port": 443, + "connect_retries": 5, + "connect_timeout": 60, + } + +@pytest.fixture(autouse=True) +def skip_by_profile_type(request): + profile_type = request.config.getoption("--profile") + if request.node.get_closest_marker("skip_profile"): + if request.node.get_closest_marker("skip_profile").args[0] == profile_type: + pytest.skip("skipped on '{profile_type}' profile") diff --git a/tests/functional/adapter/test_basic.py b/tests/functional/adapter/test_basic.py new file mode 100644 index 000000000..c459e9462 --- /dev/null +++ b/tests/functional/adapter/test_basic.py @@ -0,0 +1,79 @@ +import pytest + +from dbt.tests.adapter.basic.test_base import BaseSimpleMaterializations +from dbt.tests.adapter.basic.test_singular_tests import BaseSingularTests +from dbt.tests.adapter.basic.test_singular_tests_ephemeral import ( + BaseSingularTestsEphemeral, +) +from dbt.tests.adapter.basic.test_empty import BaseEmpty +from dbt.tests.adapter.basic.test_ephemeral import BaseEphemeral +from dbt.tests.adapter.basic.test_incremental import BaseIncremental +from dbt.tests.adapter.basic.test_generic_tests import BaseGenericTests +from dbt.tests.adapter.basic.test_snapshot_check_cols import BaseSnapshotCheckCols +from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestamp + + +@pytest.mark.skip_profile('databricks_sql_endpoint') +class TestSimpleMaterializationsSpark(BaseSimpleMaterializations): + pass + + +class TestSingularTestsSpark(BaseSingularTests): + pass + + +# The local cluster currently tests on spark 2.x, which does not support this +# if we upgrade it to 3.x, we can enable this test +@pytest.mark.skip_profile('apache_spark') +class TestSingularTestsEphemeralSpark(BaseSingularTestsEphemeral): + pass + + +class TestEmptySpark(BaseEmpty): + pass + + +@pytest.mark.skip_profile('databricks_sql_endpoint') +class TestEphemeralSpark(BaseEphemeral): + pass + + +@pytest.mark.skip_profile('databricks_sql_endpoint') +class TestIncrementalSpark(BaseIncremental): + pass + + +class TestGenericTestsSpark(BaseGenericTests): + pass + + +# These tests were not enabled in the dbtspec files, so skipping here. +# Error encountered was: Error running query: java.lang.ClassNotFoundException: delta.DefaultSource +@pytest.mark.skip_profile('apache_spark') +class TestSnapshotCheckColsSpark(BaseSnapshotCheckCols): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "seeds": { + "+file_format": "delta", + }, + "snapshots": { + "+file_format": "delta", + } + } + + +#hese tests were not enabled in the dbtspec files, so skipping here. +# Error encountered was: Error running query: java.lang.ClassNotFoundException: delta.DefaultSource +@pytest.mark.skip_profile('apache_spark') +class TestSnapshotTimestampSpark(BaseSnapshotTimestamp): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "seeds": { + "+file_format": "delta", + }, + "snapshots": { + "+file_format": "delta", + } + } diff --git a/tests/integration/base.py b/tests/integration/base.py index acce6a744..e36162aaf 100644 --- a/tests/integration/base.py +++ b/tests/integration/base.py @@ -77,6 +77,8 @@ def __init__(self): class TestArgs: + __test__ = False + def __init__(self, kwargs): self.which = 'run' self.single_threaded = False diff --git a/tests/integration/incremental_unique_id_test/test_incremental_unique_id.py b/tests/integration/incremental_unique_id_test/test_incremental_unique_id.py index 6ba80bc75..96e619120 100644 --- a/tests/integration/incremental_unique_id_test/test_incremental_unique_id.py +++ b/tests/integration/incremental_unique_id_test/test_incremental_unique_id.py @@ -4,8 +4,8 @@ from pathlib import Path -TestResults = namedtuple( - 'TestResults', +ResultHolder = namedtuple( + 'ResultHolder', ['seed_count', 'model_count', 'seed_rows', 'inc_test_model_count', 'opt_model_count', 'relation'], ) @@ -95,7 +95,7 @@ def test_scenario_correctness(self, expected_fields, test_case_fields): def stub_expected_fields( self, relation, seed_rows, opt_model_count=None ): - return TestResults( + return ResultHolder( seed_count=1, model_count=1, seed_rows=seed_rows, inc_test_model_count=1, opt_model_count=opt_model_count, relation=relation @@ -134,7 +134,7 @@ def test__databricks_sql_endpoint_no_unique_keys(self): expected_fields = self.stub_expected_fields( relation=seed, seed_rows=seed_rows ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=None, relation=incremental_model ) @@ -152,7 +152,7 @@ def test__databricks_cluster_no_unique_keys(self): expected_fields = self.stub_expected_fields( relation=seed, seed_rows=seed_rows ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=None, relation=incremental_model ) @@ -173,7 +173,7 @@ def test__databricks_sql_endpoint_empty_str_unique_key(self): expected_fields = self.stub_expected_fields( relation=seed, seed_rows=seed_rows ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=None, relation=incremental_model ) @@ -191,7 +191,7 @@ def test__databricks_cluster_empty_str_unique_key(self): expected_fields = self.stub_expected_fields( relation=seed, seed_rows=seed_rows ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=None, relation=incremental_model ) @@ -210,7 +210,7 @@ def test__databricks_sql_endpoint_one_unique_key(self): expected_fields = self.stub_expected_fields( relation=expected_model, seed_rows=seed_rows, opt_model_count=1 ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=self.update_incremental_model(expected_model), relation=incremental_model @@ -230,7 +230,7 @@ def test__databricks_cluster_one_unique_key(self): expected_fields = self.stub_expected_fields( relation=expected_model, seed_rows=seed_rows, opt_model_count=1 ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=self.update_incremental_model(expected_model), relation=incremental_model @@ -274,7 +274,7 @@ def test__databricks_sql_endpoint_empty_unique_key_list(self): expected_fields = self.stub_expected_fields( relation=seed, seed_rows=seed_rows ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=None, relation=incremental_model ) @@ -292,7 +292,7 @@ def test__databricks_cluster_empty_unique_key_list(self): expected_fields = self.stub_expected_fields( relation=seed, seed_rows=seed_rows ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=None, relation=incremental_model ) @@ -311,7 +311,7 @@ def test__databricks_sql_endpoint_unary_unique_key_list(self): expected_fields = self.stub_expected_fields( relation=expected_model, seed_rows=seed_rows, opt_model_count=1 ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=self.update_incremental_model(expected_model), relation=incremental_model @@ -331,7 +331,7 @@ def test__databricks_cluster_unary_unique_key_list(self): expected_fields = self.stub_expected_fields( relation=expected_model, seed_rows=seed_rows, opt_model_count=1 ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=self.update_incremental_model(expected_model), relation=incremental_model @@ -351,7 +351,7 @@ def test__databricks_sql_endpoint_duplicated_unary_unique_key_list(self): expected_fields = self.stub_expected_fields( relation=expected_model, seed_rows=seed_rows, opt_model_count=1 ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=self.update_incremental_model(expected_model), relation=incremental_model @@ -371,7 +371,7 @@ def test__databricks_cluster_duplicated_unary_unique_key_list(self): expected_fields = self.stub_expected_fields( relation=expected_model, seed_rows=seed_rows, opt_model_count=1 ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=self.update_incremental_model(expected_model), relation=incremental_model @@ -391,7 +391,7 @@ def test__databricks_sql_endpoint_trinary_unique_key_list(self): expected_fields = self.stub_expected_fields( relation=expected_model, seed_rows=seed_rows, opt_model_count=1 ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=self.update_incremental_model(expected_model), relation=incremental_model @@ -411,7 +411,7 @@ def test__databricks_cluster_trinary_unique_key_list(self): expected_fields = self.stub_expected_fields( relation=expected_model, seed_rows=seed_rows, opt_model_count=1 ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=self.update_incremental_model(expected_model), relation=incremental_model @@ -431,7 +431,7 @@ def test__databricks_sql_endpoint_trinary_unique_key_list_no_update(self): expected_fields = self.stub_expected_fields( relation=seed, seed_rows=seed_rows ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=None, relation=incremental_model ) @@ -450,7 +450,7 @@ def test__databricks_cluster_trinary_unique_key_list_no_update(self): expected_fields = self.stub_expected_fields( relation=seed, seed_rows=seed_rows ) - test_case_fields = TestResults( + test_case_fields = ResultHolder( *self.setup_test(seed, incremental_model, update_sql_file), opt_model_count=None, relation=incremental_model ) @@ -478,4 +478,4 @@ def test__databricks_cluster_bad_unique_key_list(self): self.assertEqual(status, RunStatus.Error) self.assertTrue("thisisnotacolumn" in exc) - \ No newline at end of file + diff --git a/tests/specs/spark-databricks-http.dbtspec b/tests/specs/spark-databricks-http.dbtspec deleted file mode 100644 index 67342da39..000000000 --- a/tests/specs/spark-databricks-http.dbtspec +++ /dev/null @@ -1,32 +0,0 @@ -target: - type: spark - host: "{{ env_var('DBT_DATABRICKS_HOST_NAME') }}" - cluster: "{{ env_var('DBT_DATABRICKS_CLUSTER_NAME') }}" - token: "{{ env_var('DBT_DATABRICKS_TOKEN') }}" - method: http - port: 443 - schema: "analytics_{{ var('_dbt_random_suffix') }}" - connect_retries: 5 - connect_timeout: 60 -projects: - - overrides: snapshot_strategy_check_cols - dbt_project_yml: &file_format_delta - # we're going to UPDATE the seed tables as part of testing, so we must make them delta format - seeds: - dbt_test_project: - file_format: delta - snapshots: - dbt_test_project: - file_format: delta - - overrides: snapshot_strategy_timestamp - dbt_project_yml: *file_format_delta -sequences: - test_dbt_empty: empty - test_dbt_base: base - test_dbt_ephemeral: ephemeral - test_dbt_incremental: incremental - test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp - test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols - test_dbt_data_test: data_test - test_dbt_ephemeral_data_tests: data_test_ephemeral_models - test_dbt_schema_test: schema_test diff --git a/tests/specs/spark-databricks-odbc-cluster.dbtspec b/tests/specs/spark-databricks-odbc-cluster.dbtspec deleted file mode 100644 index b320dc3a4..000000000 --- a/tests/specs/spark-databricks-odbc-cluster.dbtspec +++ /dev/null @@ -1,33 +0,0 @@ -target: - type: spark - host: "{{ env_var('DBT_DATABRICKS_HOST_NAME') }}" - cluster: "{{ env_var('DBT_DATABRICKS_CLUSTER_NAME') }}" - token: "{{ env_var('DBT_DATABRICKS_TOKEN') }}" - method: odbc - driver: "{{ env_var('ODBC_DRIVER') }}" - port: 443 - schema: "analytics_{{ var('_dbt_random_suffix') }}" - connect_retries: 5 - connect_timeout: 60 -projects: - - overrides: snapshot_strategy_check_cols - dbt_project_yml: &file_format_delta - # we're going to UPDATE the seed tables as part of testing, so we must make them delta format - seeds: - dbt_test_project: - file_format: delta - snapshots: - dbt_test_project: - file_format: delta - - overrides: snapshot_strategy_timestamp - dbt_project_yml: *file_format_delta -sequences: - test_dbt_empty: empty - test_dbt_base: base - test_dbt_ephemeral: ephemeral - test_dbt_incremental: incremental - test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp - test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols - test_dbt_data_test: data_test - test_dbt_ephemeral_data_tests: data_test_ephemeral_models - test_dbt_schema_test: schema_test diff --git a/tests/specs/spark-databricks-odbc-sql-endpoint.dbtspec b/tests/specs/spark-databricks-odbc-sql-endpoint.dbtspec deleted file mode 100644 index 0aa7be765..000000000 --- a/tests/specs/spark-databricks-odbc-sql-endpoint.dbtspec +++ /dev/null @@ -1,35 +0,0 @@ -target: - type: spark - host: "{{ env_var('DBT_DATABRICKS_HOST_NAME') }}" - endpoint: "{{ env_var('DBT_DATABRICKS_ENDPOINT') }}" - token: "{{ env_var('DBT_DATABRICKS_TOKEN') }}" - method: odbc - driver: "{{ env_var('ODBC_DRIVER') }}" - port: 443 - schema: "analytics_{{ var('_dbt_random_suffix') }}" - connect_retries: 5 - connect_timeout: 60 -projects: - - overrides: snapshot_strategy_check_cols - dbt_project_yml: &file_format_delta - # we're going to UPDATE the seed tables as part of testing, so we must make them delta format - seeds: - dbt_test_project: - file_format: delta - snapshots: - dbt_test_project: - file_format: delta - - overrides: snapshot_strategy_timestamp - dbt_project_yml: *file_format_delta -sequences: - test_dbt_empty: empty - # The SQL Endpoint no longer supports `set` ?? - # test_dbt_base: base - test_dbt_ephemeral: ephemeral - # The SQL Endpoint does not support `create temporary view` - # test_dbt_incremental: incremental - test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp - test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols - test_dbt_data_test: data_test - test_dbt_ephemeral_data_tests: data_test_ephemeral_models - test_dbt_schema_test: schema_test diff --git a/tests/specs/spark-thrift.dbtspec b/tests/specs/spark-thrift.dbtspec deleted file mode 100644 index 85b843f37..000000000 --- a/tests/specs/spark-thrift.dbtspec +++ /dev/null @@ -1,22 +0,0 @@ -target: - type: spark - host: localhost - user: dbt - method: thrift - port: 10000 - connect_retries: 5 - connect_timeout: 60 - schema: "analytics_{{ var('_dbt_random_suffix') }}" -sequences: - test_dbt_empty: empty - test_dbt_base: base - test_dbt_ephemeral: ephemeral - test_dbt_incremental: incremental - # snapshots require delta format - # test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp - # test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols - test_dbt_data_test: data_test - test_dbt_schema_test: schema_test - # the local cluster currently tests on spark 2.x, which does not support this - # if we upgrade it to 3.x, we can enable this test - # test_dbt_ephemeral_data_tests: data_test_ephemeral_models diff --git a/tox.ini b/tox.ini index e896421e2..8e7719600 100644 --- a/tox.ini +++ b/tox.ini @@ -20,7 +20,7 @@ deps = [testenv:integration-spark-databricks-http] basepython = python3.8 -commands = /bin/bash -c '{envpython} -m pytest -v tests/specs/spark-databricks-http.dbtspec' +commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_http_cluster tests/functional/adapter/test_basic.py' passenv = DBT_* PYTEST_ADDOPTS deps = -r{toxinidir}/requirements.txt @@ -29,7 +29,7 @@ deps = [testenv:integration-spark-databricks-odbc-cluster] basepython = python3.8 -commands = /bin/bash -c '{envpython} -m pytest -v tests/specs/spark-databricks-odbc-cluster.dbtspec' +commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_cluster tests/functional/adapter/test_basic.py' /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_cluster {posargs} -n4 tests/integration/*' passenv = DBT_* PYTEST_ADDOPTS ODBC_DRIVER deps = @@ -39,7 +39,7 @@ deps = [testenv:integration-spark-databricks-odbc-sql-endpoint] basepython = python3.8 -commands = /bin/bash -c '{envpython} -m pytest -v tests/specs/spark-databricks-odbc-sql-endpoint.dbtspec' +commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_sql_endpoint tests/functional/adapter/test_basic.py' /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_sql_endpoint {posargs} -n4 tests/integration/*' passenv = DBT_* PYTEST_ADDOPTS ODBC_DRIVER deps = @@ -50,7 +50,7 @@ deps = [testenv:integration-spark-thrift] basepython = python3.8 -commands = /bin/bash -c '{envpython} -m pytest -v tests/specs/spark-thrift.dbtspec' +commands = /bin/bash -c '{envpython} -m pytest -v --profile apache_spark tests/functional/adapter/test_basic.py' /bin/bash -c '{envpython} -m pytest -v -m profile_apache_spark {posargs} -n4 tests/integration/*' passenv = DBT_* PYTEST_ADDOPTS deps = From d92a4e54ab0f69c8d327f0220495d1b8b468189c Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Thu, 31 Mar 2022 17:39:28 -0400 Subject: [PATCH 07/16] Remove requirement for pytest-dbt-adapter (#314) * Remove requirement for pytest-dbt-adapter * Check multiple profile types in skip_profile * Update dev_requirements to include pytest-dotenv --- dev_requirements.txt | 16 ++++++++-------- tests/conftest.py | 16 ++++++++++++++-- tests/functional/adapter/test_basic.py | 10 +++++----- tests/specs/spark-session.dbtspec | 17 ----------------- tox.ini | 2 +- 5 files changed, 28 insertions(+), 33 deletions(-) delete mode 100644 tests/specs/spark-session.dbtspec diff --git a/dev_requirements.txt b/dev_requirements.txt index 520d1f5b8..0f84cbd5d 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -4,18 +4,18 @@ git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter freezegun==0.3.9 -pytest==6.0.2 +pytest>=6.0.2 mock>=1.3.0 -flake8>=3.5.0 -pytz==2017.2 -bumpversion==0.5.3 -tox==3.2.0 +flake8 +pytz +bumpversion +tox>=3.2.0 ipdb -pytest-xdist>=2.1.0,<3 -flaky>=3.5.3,<4 +pytest-xdist +pytest-dotenv pytest-csv +flaky # Test requirements -pytest-dbt-adapter==0.6.0 sasl>=0.2.1 thrift_sasl==0.4.1 diff --git a/tests/conftest.py b/tests/conftest.py index 603dc1391..69fbf3d51 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -28,6 +28,8 @@ def dbt_profile_target(request): target = apache_spark_target() elif profile_type == "databricks_http_cluster": target = databricks_http_cluster_target() + elif profile_type == "spark_session": + target = spark_session_target() else: raise ValueError(f"Invalid profile type '{profile_type}'") return target @@ -82,9 +84,19 @@ def databricks_http_cluster_target(): "connect_timeout": 60, } + +def spark_session_target(): + return { + "type": "spark", + "host": "localhost", + "method": "session", + } + + @pytest.fixture(autouse=True) def skip_by_profile_type(request): profile_type = request.config.getoption("--profile") if request.node.get_closest_marker("skip_profile"): - if request.node.get_closest_marker("skip_profile").args[0] == profile_type: - pytest.skip("skipped on '{profile_type}' profile") + for skip_profile_type in request.node.get_closest_marker("skip_profile").args: + if skip_profile_type == profile_type: + pytest.skip("skipped on '{profile_type}' profile") diff --git a/tests/functional/adapter/test_basic.py b/tests/functional/adapter/test_basic.py index c459e9462..ff4fdd22a 100644 --- a/tests/functional/adapter/test_basic.py +++ b/tests/functional/adapter/test_basic.py @@ -13,7 +13,7 @@ from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestamp -@pytest.mark.skip_profile('databricks_sql_endpoint') +@pytest.mark.skip_profile('databricks_sql_endpoint', 'spark_session') class TestSimpleMaterializationsSpark(BaseSimpleMaterializations): pass @@ -33,12 +33,12 @@ class TestEmptySpark(BaseEmpty): pass -@pytest.mark.skip_profile('databricks_sql_endpoint') +@pytest.mark.skip_profile('databricks_sql_endpoint', 'spark_session') class TestEphemeralSpark(BaseEphemeral): pass -@pytest.mark.skip_profile('databricks_sql_endpoint') +@pytest.mark.skip_profile('databricks_sql_endpoint', 'spark_session') class TestIncrementalSpark(BaseIncremental): pass @@ -49,7 +49,7 @@ class TestGenericTestsSpark(BaseGenericTests): # These tests were not enabled in the dbtspec files, so skipping here. # Error encountered was: Error running query: java.lang.ClassNotFoundException: delta.DefaultSource -@pytest.mark.skip_profile('apache_spark') +@pytest.mark.skip_profile('apache_spark', 'spark_session') class TestSnapshotCheckColsSpark(BaseSnapshotCheckCols): @pytest.fixture(scope="class") def project_config_update(self): @@ -65,7 +65,7 @@ def project_config_update(self): #hese tests were not enabled in the dbtspec files, so skipping here. # Error encountered was: Error running query: java.lang.ClassNotFoundException: delta.DefaultSource -@pytest.mark.skip_profile('apache_spark') +@pytest.mark.skip_profile('apache_spark', 'spark_session') class TestSnapshotTimestampSpark(BaseSnapshotTimestamp): @pytest.fixture(scope="class") def project_config_update(self): diff --git a/tests/specs/spark-session.dbtspec b/tests/specs/spark-session.dbtspec deleted file mode 100644 index cd09aa178..000000000 --- a/tests/specs/spark-session.dbtspec +++ /dev/null @@ -1,17 +0,0 @@ -target: - type: spark - method: session - host: localhost - schema: "analytics_{{ var('_dbt_random_suffix') }}" -sequences: - test_dbt_empty: empty - # requires a metastore for persisting over dbt runs - # test_dbt_base: base - # test_dbt_ephemeral: ephemeral - # test_dbt_incremental: incremental - # snapshots require delta format - # test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp - # test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols - test_dbt_data_test: data_test - test_dbt_schema_test: schema_test - test_dbt_ephemeral_data_tests: data_test_ephemeral_models diff --git a/tox.ini b/tox.ini index 8e7719600..38cb1962e 100644 --- a/tox.ini +++ b/tox.ini @@ -60,7 +60,7 @@ deps = [testenv:integration-spark-session] basepython = python3 -commands = /bin/bash -c '{envpython} -m pytest -v tests/specs/spark-session.dbtspec' +commands = /bin/bash -c '{envpython} -m pytest -v --profile spark_session tests/functional/adapter/test_basic.py' passenv = DBT_* PYTEST_* From 183b9bf3819af663f008ca007d45439e540cfa95 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Mon, 4 Apr 2022 10:28:19 -0400 Subject: [PATCH 08/16] bring back local instruction (#309) --- README.md | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/README.md b/README.md index 6795d76b7..037a49895 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,56 @@ more information, consult [the docs](https://docs.getdbt.com/docs/profile-spark) - [Install dbt](https://docs.getdbt.com/docs/installation) - Read the [introduction](https://docs.getdbt.com/docs/introduction/) and [viewpoint](https://docs.getdbt.com/docs/about/viewpoint/) +## Running locally +A `docker-compose` environment starts a Spark Thrift server and a Postgres database as a Hive Metastore backend. +Note that this is spark 2 not spark 3 so some functionalities might not be available. + +The following command would start two docker containers +``` +docker-compose up -d +``` +It will take a bit of time for the instance to start, you can check the logs of the two containers. +If the instance doesn't start correctly, try the complete reset command listed below and then try start again. + +Create a profile like this one: + +``` +spark-testing: + target: local + outputs: + local: + type: spark + method: thrift + host: 127.0.0.1 + port: 10000 + user: dbt + schema: analytics + connect_retries: 5 + connect_timeout: 60 + retry_all: true +``` + +Connecting to the local spark instance: + +* The Spark UI should be available at [http://localhost:4040/sqlserver/](http://localhost:4040/sqlserver/) +* The endpoint for SQL-based testing is at `http://localhost:10000` and can be referenced with the Hive or Spark JDBC drivers using connection string `jdbc:hive2://localhost:10000` and default credentials `dbt`:`dbt` + +Note that the Hive metastore data is persisted under `./.hive-metastore/`, and the Spark-produced data under `./.spark-warehouse/`. To completely reset you environment run the following: + +``` +docker-compose down +rm -rf ./.hive-metastore/ +rm -rf ./.spark-warehouse/ +``` + +### Reporting bugs and contributing code + +- Want to report a bug or request a feature? Let us know on [Slack](http://slack.getdbt.com/), or open [an issue](https://github.com/fishtown-analytics/dbt-spark/issues/new). + +## Code of Conduct + +Everyone interacting in the dbt project's codebases, issue trackers, chat rooms, and mailing lists is expected to follow the [PyPA Code of Conduct](https://www.pypa.io/en/latest/code-of-conduct/). + ## Join the dbt Community - Be part of the conversation in the [dbt Community Slack](http://community.getdbt.com/) From fe77af712b27892f4996d438a46fda7453ab3c49 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 6 Apr 2022 00:02:14 -0700 Subject: [PATCH 09/16] Make internal macros use macro dispatch to be overridable in child adapters (#320) * Make internal macros use macro dispatch to be overridable in child adapters. * changelog * Address a comment. * Fix. * Fix. --- CHANGELOG.md | 2 ++ dbt/include/spark/macros/adapters.sql | 36 ++++++++++++++++++++++++++- tests/unit/test_macros.py | 9 ++++++- 3 files changed, 45 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e61a335cf..1c9b599d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,9 +5,11 @@ ### Under the hood - Use dbt.tests.adapter.basic in test suite ([#298](https://github.com/dbt-labs/dbt-spark/issues/298), [#299](https://github.com/dbt-labs/dbt-spark/pull/299)) +- Make internal macros use macro dispatch to be overridable in child adapters ([#319](https://github.com/dbt-labs/dbt-spark/issues/319), [#320](https://github.com/dbt-labs/dbt-spark/pull/320)) ### Contributors - [@JCZuurmond](https://github.com/dbt-labs/dbt-spark/pull/279) ( [#279](https://github.com/dbt-labs/dbt-spark/pull/279)) +- [@ueshin](https://github.com/ueshin) ([#320](https://github.com/dbt-labs/dbt-spark/pull/320)) ## dbt-spark 1.1.0b1 (March 23, 2022) diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 2542af811..e96501c45 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -1,11 +1,20 @@ {% macro file_format_clause() %} + {{ return(adapter.dispatch('file_format_clause', 'dbt')()) }} +{%- endmacro -%} + +{% macro spark__file_format_clause() %} {%- set file_format = config.get('file_format', validator=validation.any[basestring]) -%} {%- if file_format is not none %} using {{ file_format }} {%- endif %} {%- endmacro -%} + {% macro location_clause() %} + {{ return(adapter.dispatch('location_clause', 'dbt')()) }} +{%- endmacro -%} + +{% macro spark__location_clause() %} {%- set location_root = config.get('location_root', validator=validation.any[basestring]) -%} {%- set identifier = model['alias'] -%} {%- if location_root is not none %} @@ -13,7 +22,12 @@ {%- endif %} {%- endmacro -%} + {% macro options_clause() -%} + {{ return(adapter.dispatch('options_clause', 'dbt')()) }} +{%- endmacro -%} + +{% macro spark__options_clause() -%} {%- set options = config.get('options') -%} {%- if config.get('file_format') == 'hudi' -%} {%- set unique_key = config.get('unique_key') -%} @@ -35,7 +49,12 @@ {%- endif %} {%- endmacro -%} + {% macro comment_clause() %} + {{ return(adapter.dispatch('comment_clause', 'dbt')()) }} +{%- endmacro -%} + +{% macro spark__comment_clause() %} {%- set raw_persist_docs = config.get('persist_docs', {}) -%} {%- if raw_persist_docs is mapping -%} @@ -48,7 +67,12 @@ {% endif %} {%- endmacro -%} + {% macro partition_cols(label, required=false) %} + {{ return(adapter.dispatch('partition_cols', 'dbt')(label, required)) }} +{%- endmacro -%} + +{% macro spark__partition_cols(label, required=false) %} {%- set cols = config.get('partition_by', validator=validation.any[list, basestring]) -%} {%- if cols is not none %} {%- if cols is string -%} @@ -65,6 +89,10 @@ {% macro clustered_cols(label, required=false) %} + {{ return(adapter.dispatch('clustered_cols', 'dbt')(label, required)) }} +{%- endmacro -%} + +{% macro spark__clustered_cols(label, required=false) %} {%- set cols = config.get('clustered_by', validator=validation.any[list, basestring]) -%} {%- set buckets = config.get('buckets', validator=validation.any[int]) -%} {%- if (cols is not none) and (buckets is not none) %} @@ -80,6 +108,7 @@ {%- endif %} {%- endmacro -%} + {% macro fetch_tbl_properties(relation) -%} {% call statement('list_properties', fetch_result=True) -%} SHOW TBLPROPERTIES {{ relation }} @@ -88,12 +117,17 @@ {%- endmacro %} -{#-- We can't use temporary tables with `create ... as ()` syntax #} {% macro create_temporary_view(relation, sql) -%} + {{ return(adapter.dispatch('create_temporary_view', 'dbt')(relation, sql)) }} +{%- endmacro -%} + +{#-- We can't use temporary tables with `create ... as ()` syntax #} +{% macro spark__create_temporary_view(relation, sql) -%} create temporary view {{ relation.include(schema=false) }} as {{ sql }} {% endmacro %} + {% macro spark__create_table_as(temporary, relation, sql) -%} {% if temporary -%} {{ create_temporary_view(relation, sql) }} diff --git a/tests/unit/test_macros.py b/tests/unit/test_macros.py index 06ce202a7..220a74db7 100644 --- a/tests/unit/test_macros.py +++ b/tests/unit/test_macros.py @@ -15,7 +15,9 @@ def setUp(self): 'validation': mock.Mock(), 'model': mock.Mock(), 'exceptions': mock.Mock(), - 'config': mock.Mock() + 'config': mock.Mock(), + 'adapter': mock.Mock(), + 'return': lambda r: r, } self.default_context['config'].get = lambda key, default=None, **kwargs: self.config.get(key, default) @@ -24,6 +26,11 @@ def __get_template(self, template_filename): def __run_macro(self, template, name, temporary, relation, sql): self.default_context['model'].alias = relation + + def dispatch(macro_name, macro_namespace=None, packages=None): + return getattr(template.module, f'spark__{macro_name}') + self.default_context['adapter'].dispatch = dispatch + value = getattr(template.module, name)(temporary, relation, sql) return re.sub(r'\s\s+', ' ', value) From a6de5d251859f3134f1afb29fa5a158f58b0ff64 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Thu, 7 Apr 2022 15:39:01 +0200 Subject: [PATCH 10/16] CI ergonomics: connection retries (#327) * Update retries in CI * Reenable all checks on sql_endpoint * tox consistency --- .circleci/config.yml | 1 + tests/conftest.py | 15 ++++++++++++--- tests/functional/adapter/test_basic.py | 6 +++--- tests/integration/base.py | 11 +++++++++-- tox.ini | 8 ++++---- 5 files changed, 29 insertions(+), 12 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 135c22cd3..34e449acf 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -79,6 +79,7 @@ jobs: integration-spark-databricks-http: environment: DBT_INVOCATION_ENV: circle + DBT_DATABRICKS_RETRY_ALL: True docker: - image: fishtownanalytics/test-container:10 steps: diff --git a/tests/conftest.py b/tests/conftest.py index 69fbf3d51..7ba95d47b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -42,8 +42,8 @@ def apache_spark_target(): "user": "dbt", "method": "thrift", "port": 10000, - "connect_retries": 5, - "connect_timeout": 60, + "connect_retries": 3, + "connect_timeout": 5, "retry_all": True, } @@ -57,6 +57,9 @@ def databricks_cluster_target(): "token": os.getenv("DBT_DATABRICKS_TOKEN"), "driver": os.getenv("ODBC_DRIVER"), "port": 443, + "connect_retries": 3, + "connect_timeout": 5, + "retry_all": True, } @@ -69,6 +72,9 @@ def databricks_sql_endpoint_target(): "token": os.getenv("DBT_DATABRICKS_TOKEN"), "driver": os.getenv("ODBC_DRIVER"), "port": 443, + "connect_retries": 3, + "connect_timeout": 5, + "retry_all": True, } @@ -80,8 +86,11 @@ def databricks_http_cluster_target(): "token": os.getenv('DBT_DATABRICKS_TOKEN'), "method": "http", "port": 443, + # more retries + longer timout to handle unavailability while cluster is restarting + # return failures quickly in dev, retry all failures in CI (up to 5 min) "connect_retries": 5, - "connect_timeout": 60, + "connect_timeout": 60, + "retry_all": bool(os.getenv('DBT_DATABRICKS_RETRY_ALL', False)), } diff --git a/tests/functional/adapter/test_basic.py b/tests/functional/adapter/test_basic.py index ff4fdd22a..db18da6bb 100644 --- a/tests/functional/adapter/test_basic.py +++ b/tests/functional/adapter/test_basic.py @@ -13,7 +13,7 @@ from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestamp -@pytest.mark.skip_profile('databricks_sql_endpoint', 'spark_session') +@pytest.mark.skip_profile('spark_session') class TestSimpleMaterializationsSpark(BaseSimpleMaterializations): pass @@ -33,12 +33,12 @@ class TestEmptySpark(BaseEmpty): pass -@pytest.mark.skip_profile('databricks_sql_endpoint', 'spark_session') +@pytest.mark.skip_profile('spark_session') class TestEphemeralSpark(BaseEphemeral): pass -@pytest.mark.skip_profile('databricks_sql_endpoint', 'spark_session') +@pytest.mark.skip_profile('spark_session') class TestIncrementalSpark(BaseIncremental): pass diff --git a/tests/integration/base.py b/tests/integration/base.py index e36162aaf..7e557217f 100644 --- a/tests/integration/base.py +++ b/tests/integration/base.py @@ -160,8 +160,9 @@ def apache_spark_profile(self): 'user': 'dbt', 'method': 'thrift', 'port': 10000, - 'connect_retries': 5, - 'connect_timeout': 60, + 'connect_retries': 3, + 'connect_timeout': 5, + 'retry_all': True, 'schema': self.unique_schema() }, }, @@ -184,6 +185,9 @@ def databricks_cluster_profile(self): 'token': os.getenv('DBT_DATABRICKS_TOKEN'), 'driver': os.getenv('ODBC_DRIVER'), 'port': 443, + 'connect_retries': 3, + 'connect_timeout': 5, + 'retry_all': True, 'schema': self.unique_schema() }, }, @@ -206,6 +210,9 @@ def databricks_sql_endpoint_profile(self): 'token': os.getenv('DBT_DATABRICKS_TOKEN'), 'driver': os.getenv('ODBC_DRIVER'), 'port': 443, + 'connect_retries': 3, + 'connect_timeout': 5, + 'retry_all': True, 'schema': self.unique_schema() }, }, diff --git a/tox.ini b/tox.ini index 38cb1962e..6eb503af5 100644 --- a/tox.ini +++ b/tox.ini @@ -20,7 +20,7 @@ deps = [testenv:integration-spark-databricks-http] basepython = python3.8 -commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_http_cluster tests/functional/adapter/test_basic.py' +commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_http_cluster {posargs} -n4 tests/functional/adapter/test_basic.py' passenv = DBT_* PYTEST_ADDOPTS deps = -r{toxinidir}/requirements.txt @@ -29,7 +29,7 @@ deps = [testenv:integration-spark-databricks-odbc-cluster] basepython = python3.8 -commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_cluster tests/functional/adapter/test_basic.py' +commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_cluster {posargs} -n4 tests/functional/adapter/test_basic.py' /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_cluster {posargs} -n4 tests/integration/*' passenv = DBT_* PYTEST_ADDOPTS ODBC_DRIVER deps = @@ -39,7 +39,7 @@ deps = [testenv:integration-spark-databricks-odbc-sql-endpoint] basepython = python3.8 -commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_sql_endpoint tests/functional/adapter/test_basic.py' +commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_sql_endpoint {posargs} -n4 tests/functional/adapter/test_basic.py' /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_sql_endpoint {posargs} -n4 tests/integration/*' passenv = DBT_* PYTEST_ADDOPTS ODBC_DRIVER deps = @@ -50,7 +50,7 @@ deps = [testenv:integration-spark-thrift] basepython = python3.8 -commands = /bin/bash -c '{envpython} -m pytest -v --profile apache_spark tests/functional/adapter/test_basic.py' +commands = /bin/bash -c '{envpython} -m pytest -v --profile apache_spark {posargs} -n4 tests/functional/adapter/test_basic.py' /bin/bash -c '{envpython} -m pytest -v -m profile_apache_spark {posargs} -n4 tests/integration/*' passenv = DBT_* PYTEST_ADDOPTS deps = From 5d2f387bdd621b7aa9af5017e8a3e5162231823c Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Thu, 7 Apr 2022 12:46:09 -0400 Subject: [PATCH 11/16] Override 'run_sql_for_tests' for Spark (#324) --- CHANGELOG.md | 1 + dbt/adapters/spark/impl.py | 24 ++++++++++++++++++++++++ tox.ini | 2 +- 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c9b599d5..8fb00e1d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ### Under the hood - Use dbt.tests.adapter.basic in test suite ([#298](https://github.com/dbt-labs/dbt-spark/issues/298), [#299](https://github.com/dbt-labs/dbt-spark/pull/299)) - Make internal macros use macro dispatch to be overridable in child adapters ([#319](https://github.com/dbt-labs/dbt-spark/issues/319), [#320](https://github.com/dbt-labs/dbt-spark/pull/320)) +- Override adapter method 'run_sql_for_tests' ([#323](https://github.com/dbt-labs/dbt-spark/issues/323), [#324](https://github.com/dbt-labs/dbt-spark/pull/324)) ### Contributors - [@JCZuurmond](https://github.com/dbt-labs/dbt-spark/pull/279) ( [#279](https://github.com/dbt-labs/dbt-spark/pull/279)) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 74845422b..12810a64f 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -364,6 +364,30 @@ def get_rows_different_sql( return sql + # This is for use in the test suite + # Spark doesn't have 'commit' and 'rollback', so this override + # doesn't include those commands. + def run_sql_for_tests(self, sql, fetch, conn): + cursor = conn.handle.cursor() + try: + cursor.execute(sql) + if fetch == "one": + if hasattr(cursor, 'fetchone'): + return cursor.fetchone() + else: + # AttributeError: 'PyhiveConnectionWrapper' object has no attribute 'fetchone' + return cursor.fetchall()[0] + elif fetch == "all": + return cursor.fetchall() + else: + return + except BaseException as e: + print(sql) + print(e) + raise + finally: + conn.transaction_open = False + # spark does something interesting with joins when both tables have the same # static values for the join condition and complains that the join condition is diff --git a/tox.ini b/tox.ini index 6eb503af5..a268ee5b9 100644 --- a/tox.ini +++ b/tox.ini @@ -5,7 +5,7 @@ envlist = unit, flake8, integration-spark-thrift [testenv:flake8] basepython = python3.8 -commands = /bin/bash -c '$(which flake8) --select=E,W,F --ignore=W504 dbt/' +commands = /bin/bash -c '$(which flake8) --max-line-length 99 --select=E,W,F --ignore=W504 dbt/' passenv = DBT_* PYTEST_ADDOPTS deps = -r{toxinidir}/dev_requirements.txt From c169bb33627f0c87f1f6b715e1f4f72257a85bba Mon Sep 17 00:00:00 2001 From: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> Date: Fri, 8 Apr 2022 14:23:33 -0500 Subject: [PATCH 12/16] init push for convert of unique_id as a list tests for spark (#321) * rebase commit * remove old tests * adding changelog * removing changelog entry (per disucssion these prs don't need one and changing dev_requirements install) * Empty-Commit * retesting circleci --- .gitignore | 2 + CHANGELOG.md | 1 + .../adapter/test_incremental_unique_id.py | 13 + .../duplicated_unary_unique_key_list.sql | 17 - .../models/empty_str_unique_key.sql | 14 - .../models/empty_unique_key_list.sql | 12 - .../models/expected/one_str__overwrite.sql | 21 - .../unique_key_list__inplace_overwrite.sql | 21 - .../models/no_unique_key.sql | 13 - .../nontyped_trinary_unique_key_list.sql | 19 - .../models/not_found_unique_key.sql | 14 - .../models/not_found_unique_key_list.sql | 8 - .../models/str_unique_key.sql | 17 - .../models/trinary_unique_key_list.sql | 19 - .../models/unary_unique_key_list.sql | 17 - .../seeds/add_new_rows.sql | 9 - .../seeds/duplicate_insert.sql | 5 - .../incremental_unique_id_test/seeds/seed.csv | 7 - .../incremental_unique_id_test/seeds/seed.yml | 7 - .../test_incremental_unique_id.py | 481 ------------------ tox.ini | 10 +- 21 files changed, 21 insertions(+), 706 deletions(-) create mode 100644 tests/functional/adapter/test_incremental_unique_id.py delete mode 100644 tests/integration/incremental_unique_id_test/models/duplicated_unary_unique_key_list.sql delete mode 100644 tests/integration/incremental_unique_id_test/models/empty_str_unique_key.sql delete mode 100644 tests/integration/incremental_unique_id_test/models/empty_unique_key_list.sql delete mode 100644 tests/integration/incremental_unique_id_test/models/expected/one_str__overwrite.sql delete mode 100644 tests/integration/incremental_unique_id_test/models/expected/unique_key_list__inplace_overwrite.sql delete mode 100644 tests/integration/incremental_unique_id_test/models/no_unique_key.sql delete mode 100644 tests/integration/incremental_unique_id_test/models/nontyped_trinary_unique_key_list.sql delete mode 100644 tests/integration/incremental_unique_id_test/models/not_found_unique_key.sql delete mode 100644 tests/integration/incremental_unique_id_test/models/not_found_unique_key_list.sql delete mode 100644 tests/integration/incremental_unique_id_test/models/str_unique_key.sql delete mode 100644 tests/integration/incremental_unique_id_test/models/trinary_unique_key_list.sql delete mode 100644 tests/integration/incremental_unique_id_test/models/unary_unique_key_list.sql delete mode 100644 tests/integration/incremental_unique_id_test/seeds/add_new_rows.sql delete mode 100644 tests/integration/incremental_unique_id_test/seeds/duplicate_insert.sql delete mode 100644 tests/integration/incremental_unique_id_test/seeds/seed.csv delete mode 100644 tests/integration/incremental_unique_id_test/seeds/seed.yml delete mode 100644 tests/integration/incremental_unique_id_test/test_incremental_unique_id.py diff --git a/.gitignore b/.gitignore index 4c05634f3..cc586f5fe 100644 --- a/.gitignore +++ b/.gitignore @@ -5,12 +5,14 @@ env/ *.pyc __pycache__ .tox/ +.env .idea/ build/ dist/ dbt-integration-tests test/integration/.user.yml .DS_Store +test.env .vscode *.log logs/ \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 8fb00e1d6..bb54c92f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - Make internal macros use macro dispatch to be overridable in child adapters ([#319](https://github.com/dbt-labs/dbt-spark/issues/319), [#320](https://github.com/dbt-labs/dbt-spark/pull/320)) - Override adapter method 'run_sql_for_tests' ([#323](https://github.com/dbt-labs/dbt-spark/issues/323), [#324](https://github.com/dbt-labs/dbt-spark/pull/324)) + ### Contributors - [@JCZuurmond](https://github.com/dbt-labs/dbt-spark/pull/279) ( [#279](https://github.com/dbt-labs/dbt-spark/pull/279)) - [@ueshin](https://github.com/ueshin) ([#320](https://github.com/dbt-labs/dbt-spark/pull/320)) diff --git a/tests/functional/adapter/test_incremental_unique_id.py b/tests/functional/adapter/test_incremental_unique_id.py new file mode 100644 index 000000000..18bac3f39 --- /dev/null +++ b/tests/functional/adapter/test_incremental_unique_id.py @@ -0,0 +1,13 @@ +import pytest +from dbt.tests.adapter.incremental.test_incremental_unique_id import BaseIncrementalUniqueKey + +@pytest.mark.skip_profile('spark_session', 'apache_spark') +class TestUniqueKeySpark(BaseIncrementalUniqueKey): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+file_format": "delta", + "+incremental_strategy": "merge", + } + } \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/duplicated_unary_unique_key_list.sql b/tests/integration/incremental_unique_id_test/models/duplicated_unary_unique_key_list.sql deleted file mode 100644 index 7290b6c43..000000000 --- a/tests/integration/incremental_unique_id_test/models/duplicated_unary_unique_key_list.sql +++ /dev/null @@ -1,17 +0,0 @@ -{{ - config( - materialized='incremental', - unique_key=['state', 'state'] - ) -}} - -select - state as state, - county as county, - city as city, - last_visit_date as last_visit_date -from {{ ref('seed') }} - -{% if is_incremental() %} - where last_visit_date > (select max(last_visit_date) from {{ this }}) -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/empty_str_unique_key.sql b/tests/integration/incremental_unique_id_test/models/empty_str_unique_key.sql deleted file mode 100644 index 5260e177c..000000000 --- a/tests/integration/incremental_unique_id_test/models/empty_str_unique_key.sql +++ /dev/null @@ -1,14 +0,0 @@ -{{ - config( - materialized='incremental', - unique_key='' - ) -}} - -select - * -from {{ ref('seed') }} - -{% if is_incremental() %} - where last_visit_date > (select max(last_visit_date) from {{ this }}) -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/empty_unique_key_list.sql b/tests/integration/incremental_unique_id_test/models/empty_unique_key_list.sql deleted file mode 100644 index c582d532c..000000000 --- a/tests/integration/incremental_unique_id_test/models/empty_unique_key_list.sql +++ /dev/null @@ -1,12 +0,0 @@ -{{ - config( - materialized='incremental', - unique_key=[] - ) -}} - -select * from {{ ref('seed') }} - -{% if is_incremental() %} - where last_visit_date > (select max(last_visit_date) from {{ this }}) -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/expected/one_str__overwrite.sql b/tests/integration/incremental_unique_id_test/models/expected/one_str__overwrite.sql deleted file mode 100644 index c7101152b..000000000 --- a/tests/integration/incremental_unique_id_test/models/expected/one_str__overwrite.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ - config( - materialized='table' - ) -}} - -select - 'CT' as state, - 'Hartford' as county, - 'Hartford' as city, - cast('2022-02-14' as date) as last_visit_date -union all -select 'MA','Suffolk','Boston',cast('2020-02-12' as date) -union all -select 'NJ','Mercer','Trenton',cast('2022-01-01' as date) -union all -select 'NY','Kings','Brooklyn',cast('2021-04-02' as date) -union all -select 'NY','New York','Manhattan',cast('2021-04-01' as date) -union all -select 'PA','Philadelphia','Philadelphia',cast('2021-05-21' as date) \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/expected/unique_key_list__inplace_overwrite.sql b/tests/integration/incremental_unique_id_test/models/expected/unique_key_list__inplace_overwrite.sql deleted file mode 100644 index c7101152b..000000000 --- a/tests/integration/incremental_unique_id_test/models/expected/unique_key_list__inplace_overwrite.sql +++ /dev/null @@ -1,21 +0,0 @@ -{{ - config( - materialized='table' - ) -}} - -select - 'CT' as state, - 'Hartford' as county, - 'Hartford' as city, - cast('2022-02-14' as date) as last_visit_date -union all -select 'MA','Suffolk','Boston',cast('2020-02-12' as date) -union all -select 'NJ','Mercer','Trenton',cast('2022-01-01' as date) -union all -select 'NY','Kings','Brooklyn',cast('2021-04-02' as date) -union all -select 'NY','New York','Manhattan',cast('2021-04-01' as date) -union all -select 'PA','Philadelphia','Philadelphia',cast('2021-05-21' as date) \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/no_unique_key.sql b/tests/integration/incremental_unique_id_test/models/no_unique_key.sql deleted file mode 100644 index 44a63e75c..000000000 --- a/tests/integration/incremental_unique_id_test/models/no_unique_key.sql +++ /dev/null @@ -1,13 +0,0 @@ -{{ - config( - materialized='incremental' - ) -}} - -select - * -from {{ ref('seed') }} - -{% if is_incremental() %} - where last_visit_date > (select max(last_visit_date) from {{ this }}) -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/nontyped_trinary_unique_key_list.sql b/tests/integration/incremental_unique_id_test/models/nontyped_trinary_unique_key_list.sql deleted file mode 100644 index 52b4509f0..000000000 --- a/tests/integration/incremental_unique_id_test/models/nontyped_trinary_unique_key_list.sql +++ /dev/null @@ -1,19 +0,0 @@ --- for comparing against auto-typed seeds - -{{ - config( - materialized='incremental', - unique_key=['state', 'county', 'city'] - ) -}} - -select - state as state, - county as county, - city as city, - last_visit_date as last_visit_date -from {{ ref('seed') }} - -{% if is_incremental() %} - where last_visit_date > (select max(last_visit_date) from {{ this }}) -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/not_found_unique_key.sql b/tests/integration/incremental_unique_id_test/models/not_found_unique_key.sql deleted file mode 100644 index d247aa341..000000000 --- a/tests/integration/incremental_unique_id_test/models/not_found_unique_key.sql +++ /dev/null @@ -1,14 +0,0 @@ -{{ - config( - materialized='incremental', - unique_key='thisisnotacolumn' - ) -}} - -select - * -from {{ ref('seed') }} - -{% if is_incremental() %} - where last_visit_date > (select max(last_visit_date) from {{ this }}) -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/not_found_unique_key_list.sql b/tests/integration/incremental_unique_id_test/models/not_found_unique_key_list.sql deleted file mode 100644 index f1462a48f..000000000 --- a/tests/integration/incremental_unique_id_test/models/not_found_unique_key_list.sql +++ /dev/null @@ -1,8 +0,0 @@ -{{ - config( - materialized='incremental', - unique_key=['state', 'thisisnotacolumn'] - ) -}} - -select * from {{ ref('seed') }} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/str_unique_key.sql b/tests/integration/incremental_unique_id_test/models/str_unique_key.sql deleted file mode 100644 index 2f9fc2987..000000000 --- a/tests/integration/incremental_unique_id_test/models/str_unique_key.sql +++ /dev/null @@ -1,17 +0,0 @@ -{{ - config( - materialized='incremental', - unique_key='state' - ) -}} - -select - state as state, - county as county, - city as city, - last_visit_date as last_visit_date -from {{ ref('seed') }} - -{% if is_incremental() %} - where last_visit_date > (select max(last_visit_date) from {{ this }}) -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/trinary_unique_key_list.sql b/tests/integration/incremental_unique_id_test/models/trinary_unique_key_list.sql deleted file mode 100644 index 0359546bf..000000000 --- a/tests/integration/incremental_unique_id_test/models/trinary_unique_key_list.sql +++ /dev/null @@ -1,19 +0,0 @@ --- types needed to compare against expected model reliably - -{{ - config( - materialized='incremental', - unique_key=['state', 'county', 'city'] - ) -}} - -select - state as state, - county as county, - city as city, - last_visit_date as last_visit_date -from {{ ref('seed') }} - -{% if is_incremental() %} - where last_visit_date > (select max(last_visit_date) from {{ this }}) -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/models/unary_unique_key_list.sql b/tests/integration/incremental_unique_id_test/models/unary_unique_key_list.sql deleted file mode 100644 index 7f5875f85..000000000 --- a/tests/integration/incremental_unique_id_test/models/unary_unique_key_list.sql +++ /dev/null @@ -1,17 +0,0 @@ -{{ - config( - materialized='incremental', - unique_key=['state'] - ) -}} - -select - state as state, - county as county, - city as city, - last_visit_date as last_visit_date -from {{ ref('seed') }} - -{% if is_incremental() %} - where last_visit_date > (select max(last_visit_date) from {{ this }}) -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/seeds/add_new_rows.sql b/tests/integration/incremental_unique_id_test/seeds/add_new_rows.sql deleted file mode 100644 index e5611fe32..000000000 --- a/tests/integration/incremental_unique_id_test/seeds/add_new_rows.sql +++ /dev/null @@ -1,9 +0,0 @@ --- insert two new rows, both of which should be in incremental model --- with any unique columns -insert into {schema}.seed - (state, county, city, last_visit_date) -values ('WA','King','Seattle',cast('2022-02-01' as date)); - -insert into {schema}.seed - (state, county, city, last_visit_date) -values ('CA','Los Angeles','Los Angeles',cast('2022-02-01' as date)); \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/seeds/duplicate_insert.sql b/tests/integration/incremental_unique_id_test/seeds/duplicate_insert.sql deleted file mode 100644 index 8abe2808f..000000000 --- a/tests/integration/incremental_unique_id_test/seeds/duplicate_insert.sql +++ /dev/null @@ -1,5 +0,0 @@ --- insert new row, which should not be in incremental model --- with primary or first three columns unique -insert into {schema}.seed - (state, county, city, last_visit_date) -values ('CT','Hartford','Hartford',cast('2022-02-14' as date)); \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/seeds/seed.csv b/tests/integration/incremental_unique_id_test/seeds/seed.csv deleted file mode 100644 index b988827fb..000000000 --- a/tests/integration/incremental_unique_id_test/seeds/seed.csv +++ /dev/null @@ -1,7 +0,0 @@ -state,county,city,last_visit_date -CT,Hartford,Hartford,2020-09-23 -MA,Suffolk,Boston,2020-02-12 -NJ,Mercer,Trenton,2022-01-01 -NY,Kings,Brooklyn,2021-04-02 -NY,New York,Manhattan,2021-04-01 -PA,Philadelphia,Philadelphia,2021-05-21 \ No newline at end of file diff --git a/tests/integration/incremental_unique_id_test/seeds/seed.yml b/tests/integration/incremental_unique_id_test/seeds/seed.yml deleted file mode 100644 index c048548a8..000000000 --- a/tests/integration/incremental_unique_id_test/seeds/seed.yml +++ /dev/null @@ -1,7 +0,0 @@ -version: 2 - -seeds: - - name: seed - config: - column_types: - last_visit_date: date diff --git a/tests/integration/incremental_unique_id_test/test_incremental_unique_id.py b/tests/integration/incremental_unique_id_test/test_incremental_unique_id.py deleted file mode 100644 index 96e619120..000000000 --- a/tests/integration/incremental_unique_id_test/test_incremental_unique_id.py +++ /dev/null @@ -1,481 +0,0 @@ -from tests.integration.base import DBTIntegrationTest, use_profile -from dbt.contracts.results import RunStatus -from collections import namedtuple -from pathlib import Path - - -ResultHolder = namedtuple( - 'ResultHolder', - ['seed_count', 'model_count', 'seed_rows', 'inc_test_model_count', - 'opt_model_count', 'relation'], -) - - -class TestIncrementalUniqueKey(DBTIntegrationTest): - @property - def schema(self): - return 'incremental_unique_key' - - @property - def models(self): - return 'models' - - @property - def project_config(self): - return { - "config-version": 2, - "models": { - "+file_format": "delta", - "+incremental_strategy": "merge" - } - } - - def update_incremental_model(self, incremental_model): - '''update incremental model after the seed table has been updated''' - model_result_set = self.run_dbt(['run', '--select', incremental_model]) - return len(model_result_set) - - def setup_test(self, seed, incremental_model, update_sql_file): - '''build a test case and return values for assertions''' - - # Idempotently create some number of seeds and incremental models - seed_count = len(self.run_dbt( - ['seed', '--select', seed, '--full-refresh'] - )) - model_count = len(self.run_dbt( - ['run', '--select', incremental_model, '--full-refresh'] - )) - - # Upate seed and return new row count - row_count_query = 'select * from {}.{}'.format( - self.unique_schema(), - seed - ) - self.run_sql_file(Path('seeds') / Path(update_sql_file + '.sql')) - seed_rows = len(self.run_sql(row_count_query, fetch='all')) - - inc_test_model_count = self.update_incremental_model( - incremental_model=incremental_model - ) - - return (seed_count, model_count, seed_rows, inc_test_model_count) - - def test_scenario_correctness(self, expected_fields, test_case_fields): - '''Invoke assertions to verify correct build functionality''' - # 1. test seed(s) should build afresh - self.assertEqual( - expected_fields.seed_count, test_case_fields.seed_count - ) - # 2. test model(s) should build afresh - self.assertEqual( - expected_fields.model_count, test_case_fields.model_count - ) - # 3. seeds should have intended row counts post update - self.assertEqual( - expected_fields.seed_rows, test_case_fields.seed_rows - ) - # 4. incremental test model(s) should be updated - self.assertEqual( - expected_fields.inc_test_model_count, - test_case_fields.inc_test_model_count - ) - # 5. extra incremental model(s) should be built; optional since - # comparison may be between an incremental model and seed - if (expected_fields.opt_model_count and - test_case_fields.opt_model_count): - self.assertEqual( - expected_fields.opt_model_count, - test_case_fields.opt_model_count - ) - # 6. result table should match intended result set (itself a relation) - self.assertTablesEqual( - expected_fields.relation, test_case_fields.relation - ) - - def stub_expected_fields( - self, relation, seed_rows, opt_model_count=None - ): - return ResultHolder( - seed_count=1, model_count=1, seed_rows=seed_rows, - inc_test_model_count=1, opt_model_count=opt_model_count, - relation=relation - ) - - def fail_to_build_inc_missing_unique_key_column(self, incremental_model_name): - '''should pass back error state when trying build an incremental - model whose unique key or keylist includes a column missing - from the incremental model''' - seed_count = len(self.run_dbt( - ['seed', '--select', 'seed', '--full-refresh'] - )) - # unique keys are not applied on first run, so two are needed - self.run_dbt( - ['run', '--select', incremental_model_name, '--full-refresh'], - expect_pass=True - ) - run_result = self.run_dbt( - ['run', '--select', incremental_model_name], - expect_pass=False - ).results[0] - - return run_result.status, run_result.message - - -class TestNoIncrementalUniqueKey(TestIncrementalUniqueKey): - - @use_profile("databricks_sql_endpoint") - def test__databricks_sql_endpoint_no_unique_keys(self): - '''with no unique keys, seed and model should match''' - seed='seed' - seed_rows=8 - incremental_model='no_unique_key' - update_sql_file='add_new_rows' - - expected_fields = self.stub_expected_fields( - relation=seed, seed_rows=seed_rows - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=None, relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile("databricks_cluster") - def test__databricks_cluster_no_unique_keys(self): - '''with no unique keys, seed and model should match''' - seed='seed' - seed_rows=8 - incremental_model='no_unique_key' - update_sql_file='add_new_rows' - - expected_fields = self.stub_expected_fields( - relation=seed, seed_rows=seed_rows - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=None, relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - -class TestIncrementalStrUniqueKey(TestIncrementalUniqueKey): - - @use_profile('databricks_sql_endpoint') - def test__databricks_sql_endpoint_empty_str_unique_key(self): - '''with empty string for unique key, seed and model should match''' - seed='seed' - seed_rows=8 - incremental_model='empty_str_unique_key' - update_sql_file='add_new_rows' - - expected_fields = self.stub_expected_fields( - relation=seed, seed_rows=seed_rows - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=None, relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile('databricks_cluster') - def test__databricks_cluster_empty_str_unique_key(self): - '''with empty string for unique key, seed and model should match''' - seed='seed' - seed_rows=8 - incremental_model='empty_str_unique_key' - update_sql_file='add_new_rows' - - expected_fields = self.stub_expected_fields( - relation=seed, seed_rows=seed_rows - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=None, relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile('databricks_sql_endpoint') - def test__databricks_sql_endpoint_one_unique_key(self): - '''with one unique key, model will overwrite existing row''' - seed='seed' - seed_rows=7 - incremental_model='str_unique_key' - update_sql_file='duplicate_insert' - expected_model='one_str__overwrite' - - expected_fields = self.stub_expected_fields( - relation=expected_model, seed_rows=seed_rows, opt_model_count=1 - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=self.update_incremental_model(expected_model), - relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile('databricks_cluster') - def test__databricks_cluster_one_unique_key(self): - '''with one unique key, model will overwrite existing row''' - seed='seed' - seed_rows=7 - incremental_model='str_unique_key' - update_sql_file='duplicate_insert' - expected_model='one_str__overwrite' - - expected_fields = self.stub_expected_fields( - relation=expected_model, seed_rows=seed_rows, opt_model_count=1 - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=self.update_incremental_model(expected_model), - relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile('databricks_sql_endpoint') - def test__databricks_sql_endpoint_bad_unique_key(self): - '''expect compilation error from unique key not being a column''' - - (status, exc) = self.fail_to_build_inc_missing_unique_key_column( - incremental_model_name='not_found_unique_key' - ) - - self.assertEqual(status, RunStatus.Error) - self.assertTrue("thisisnotacolumn" in exc) - - @use_profile('databricks_cluster') - def test__databricks_cluster_bad_unique_key(self): - '''expect compilation error from unique key not being a column''' - - (status, exc) = self.fail_to_build_inc_missing_unique_key_column( - incremental_model_name='not_found_unique_key' - ) - - self.assertEqual(status, RunStatus.Error) - self.assertTrue("thisisnotacolumn" in exc) - - -class TestIncrementalListUniqueKey(TestIncrementalUniqueKey): - - @use_profile('databricks_sql_endpoint') - def test__databricks_sql_endpoint_empty_unique_key_list(self): - '''with no unique keys, seed and model should match''' - seed='seed' - seed_rows=8 - incremental_model='empty_unique_key_list' - update_sql_file='add_new_rows' - - expected_fields = self.stub_expected_fields( - relation=seed, seed_rows=seed_rows - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=None, relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile('databricks_cluster') - def test__databricks_cluster_empty_unique_key_list(self): - '''with no unique keys, seed and model should match''' - seed='seed' - seed_rows=8 - incremental_model='empty_unique_key_list' - update_sql_file='add_new_rows' - - expected_fields = self.stub_expected_fields( - relation=seed, seed_rows=seed_rows - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=None, relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile('databricks_sql_endpoint') - def test__databricks_sql_endpoint_unary_unique_key_list(self): - '''with one unique key, model will overwrite existing row''' - seed='seed' - seed_rows=7 - incremental_model='unary_unique_key_list' - update_sql_file='duplicate_insert' - expected_model='unique_key_list__inplace_overwrite' - - expected_fields = self.stub_expected_fields( - relation=expected_model, seed_rows=seed_rows, opt_model_count=1 - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=self.update_incremental_model(expected_model), - relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile('databricks_cluster') - def test__databricks_cluster_unary_unique_key_list(self): - '''with one unique key, model will overwrite existing row''' - seed='seed' - seed_rows=7 - incremental_model='unary_unique_key_list' - update_sql_file='duplicate_insert' - expected_model='unique_key_list__inplace_overwrite' - - expected_fields = self.stub_expected_fields( - relation=expected_model, seed_rows=seed_rows, opt_model_count=1 - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=self.update_incremental_model(expected_model), - relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile('databricks_sql_endpoint') - def test__databricks_sql_endpoint_duplicated_unary_unique_key_list(self): - '''with two of the same unique key, model will overwrite existing row''' - seed='seed' - seed_rows=7 - incremental_model='duplicated_unary_unique_key_list' - update_sql_file='duplicate_insert' - expected_model='unique_key_list__inplace_overwrite' - - expected_fields = self.stub_expected_fields( - relation=expected_model, seed_rows=seed_rows, opt_model_count=1 - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=self.update_incremental_model(expected_model), - relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile('databricks_cluster') - def test__databricks_cluster_duplicated_unary_unique_key_list(self): - '''with two of the same unique key, model will overwrite existing row''' - seed='seed' - seed_rows=7 - incremental_model='duplicated_unary_unique_key_list' - update_sql_file='duplicate_insert' - expected_model='unique_key_list__inplace_overwrite' - - expected_fields = self.stub_expected_fields( - relation=expected_model, seed_rows=seed_rows, opt_model_count=1 - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=self.update_incremental_model(expected_model), - relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile('databricks_sql_endpoint') - def test__databricks_sql_endpoint_trinary_unique_key_list(self): - '''with three unique keys, model will overwrite existing row''' - seed='seed' - seed_rows=7 - incremental_model='trinary_unique_key_list' - update_sql_file='duplicate_insert' - expected_model='unique_key_list__inplace_overwrite' - - expected_fields = self.stub_expected_fields( - relation=expected_model, seed_rows=seed_rows, opt_model_count=1 - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=self.update_incremental_model(expected_model), - relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile('databricks_cluster') - def test__databricks_cluster_trinary_unique_key_list(self): - '''with three unique keys, model will overwrite existing row''' - seed='seed' - seed_rows=7 - incremental_model='trinary_unique_key_list' - update_sql_file='duplicate_insert' - expected_model='unique_key_list__inplace_overwrite' - - expected_fields = self.stub_expected_fields( - relation=expected_model, seed_rows=seed_rows, opt_model_count=1 - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=self.update_incremental_model(expected_model), - relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile('databricks_sql_endpoint') - def test__databricks_sql_endpoint_trinary_unique_key_list_no_update(self): - '''even with three unique keys, adding distinct rows to seed does not - cause seed and model to diverge''' - seed='seed' - seed_rows=8 - incremental_model='nontyped_trinary_unique_key_list' - update_sql_file='add_new_rows' - - expected_fields = self.stub_expected_fields( - relation=seed, seed_rows=seed_rows - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=None, relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile('databricks_cluster') - def test__databricks_cluster_trinary_unique_key_list_no_update(self): - '''even with three unique keys, adding distinct rows to seed does not - cause seed and model to diverge''' - seed='seed' - seed_rows=8 - incremental_model='nontyped_trinary_unique_key_list' - update_sql_file='add_new_rows' - - expected_fields = self.stub_expected_fields( - relation=seed, seed_rows=seed_rows - ) - test_case_fields = ResultHolder( - *self.setup_test(seed, incremental_model, update_sql_file), - opt_model_count=None, relation=incremental_model - ) - - self.test_scenario_correctness(expected_fields, test_case_fields) - - @use_profile('databricks_sql_endpoint') - def test__databricks_sql_endpoint_bad_unique_key_list(self): - '''expect compilation error from unique key not being a column''' - - (status, exc) = self.fail_to_build_inc_missing_unique_key_column( - incremental_model_name='not_found_unique_key_list' - ) - - self.assertEqual(status, RunStatus.Error) - self.assertTrue("thisisnotacolumn" in exc) - - @use_profile('databricks_cluster') - def test__databricks_cluster_bad_unique_key_list(self): - '''expect compilation error from unique key not being a column''' - - (status, exc) = self.fail_to_build_inc_missing_unique_key_column( - incremental_model_name='not_found_unique_key_list' - ) - - self.assertEqual(status, RunStatus.Error) - self.assertTrue("thisisnotacolumn" in exc) - diff --git a/tox.ini b/tox.ini index a268ee5b9..1e0e2b8b6 100644 --- a/tox.ini +++ b/tox.ini @@ -20,7 +20,7 @@ deps = [testenv:integration-spark-databricks-http] basepython = python3.8 -commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_http_cluster {posargs} -n4 tests/functional/adapter/test_basic.py' +commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_http_cluster {posargs} -n4 tests/functional/adapter/*' passenv = DBT_* PYTEST_ADDOPTS deps = -r{toxinidir}/requirements.txt @@ -29,7 +29,7 @@ deps = [testenv:integration-spark-databricks-odbc-cluster] basepython = python3.8 -commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_cluster {posargs} -n4 tests/functional/adapter/test_basic.py' +commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_cluster {posargs} -n4 tests/functional/adapter/*' /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_cluster {posargs} -n4 tests/integration/*' passenv = DBT_* PYTEST_ADDOPTS ODBC_DRIVER deps = @@ -39,7 +39,7 @@ deps = [testenv:integration-spark-databricks-odbc-sql-endpoint] basepython = python3.8 -commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_sql_endpoint {posargs} -n4 tests/functional/adapter/test_basic.py' +commands = /bin/bash -c '{envpython} -m pytest -v --profile databricks_sql_endpoint {posargs} -n4 tests/functional/adapter/*' /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_sql_endpoint {posargs} -n4 tests/integration/*' passenv = DBT_* PYTEST_ADDOPTS ODBC_DRIVER deps = @@ -50,7 +50,7 @@ deps = [testenv:integration-spark-thrift] basepython = python3.8 -commands = /bin/bash -c '{envpython} -m pytest -v --profile apache_spark {posargs} -n4 tests/functional/adapter/test_basic.py' +commands = /bin/bash -c '{envpython} -m pytest -v --profile apache_spark {posargs} -n4 tests/functional/adapter/*' /bin/bash -c '{envpython} -m pytest -v -m profile_apache_spark {posargs} -n4 tests/integration/*' passenv = DBT_* PYTEST_ADDOPTS deps = @@ -60,7 +60,7 @@ deps = [testenv:integration-spark-session] basepython = python3 -commands = /bin/bash -c '{envpython} -m pytest -v --profile spark_session tests/functional/adapter/test_basic.py' +commands = /bin/bash -c '{envpython} -m pytest -v --profile spark_session {posargs} -n4 tests/functional/adapter/*' passenv = DBT_* PYTEST_* From bb5075ece999e678faa863bf2ce54f2986321b2f Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Fri, 8 Apr 2022 15:28:58 -0600 Subject: [PATCH 13/16] convert adapter test (#328) --- dbt/adapters/spark/impl.py | 16 ++++++++++++++-- tests/functional/adapter/test_basic.py | 4 ++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 12810a64f..268417d07 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -231,8 +231,20 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: # return relation's schema. if columns are empty from cache, # use get_columns_in_relation spark macro # which would execute 'describe extended tablename' query - rows: List[agate.Row] = super().get_columns_in_relation(relation) - columns = self.parse_describe_extended(relation, rows) + try: + rows: List[agate.Row] = super().get_columns_in_relation(relation) + columns = self.parse_describe_extended(relation, rows) + except dbt.exceptions.RuntimeException as e: + # spark would throw error when table doesn't exist, where other + # CDW would just return and empty list, normalizing the behavior here + errmsg = getattr(e, "msg", "") + if ( + f"Table or view not found: {relation}" in errmsg or + "NoSuchTableException" in errmsg + ): + pass + else: + raise e # strip hudi metadata columns. columns = [x for x in columns diff --git a/tests/functional/adapter/test_basic.py b/tests/functional/adapter/test_basic.py index db18da6bb..70f3267a4 100644 --- a/tests/functional/adapter/test_basic.py +++ b/tests/functional/adapter/test_basic.py @@ -11,6 +11,7 @@ from dbt.tests.adapter.basic.test_generic_tests import BaseGenericTests from dbt.tests.adapter.basic.test_snapshot_check_cols import BaseSnapshotCheckCols from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestamp +from dbt.tests.adapter.basic.test_adapter_methods import BaseAdapterMethod @pytest.mark.skip_profile('spark_session') @@ -77,3 +78,6 @@ def project_config_update(self): "+file_format": "delta", } } + +class TestBaseAdapterMethod(BaseAdapterMethod): + pass \ No newline at end of file From 4a3fa57924bd6aca098bccfff99d64546594989a Mon Sep 17 00:00:00 2001 From: leahwicz <60146280+leahwicz@users.noreply.github.com> Date: Tue, 12 Apr 2022 17:43:12 -0400 Subject: [PATCH 14/16] Updating requirements to point to release branch --- dev_requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev_requirements.txt b/dev_requirements.txt index 0f84cbd5d..60f36c6ef 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,7 +1,7 @@ # install latest changes in dbt-core # TODO: how to automate switching from develop to version branches? -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter +git+https://github.com/dbt-labs/dbt-core.git@1.1.latest#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git@1.1.latest#egg=dbt-tests-adapter&subdirectory=tests/adapter freezegun==0.3.9 pytest>=6.0.2 From 6862c7f32b620e167140e5b34270888b2830049a Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Wed, 13 Apr 2022 10:56:33 -0600 Subject: [PATCH 15/16] Backport table not exist, Bumping version to 1.1.0rc1 (#333) --- .bumpversion.cfg | 2 +- .github/workflows/main.yml | 13 +++++++++++++ CHANGELOG.md | 2 +- dbt/adapters/spark/__version__.py | 2 +- dbt/adapters/spark/impl.py | 2 +- setup.py | 2 +- 6 files changed, 18 insertions(+), 5 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 9a0c41a56..ed807aa05 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.1.0b1 +current_version = 1.1.0rc1 parse = (?P\d+) \.(?P\d+) \.(?P\d+) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 60a0d6f60..fbdbbbaae 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -122,6 +122,9 @@ jobs: runs-on: ubuntu-latest + outputs: + is_alpha: ${{ steps.check-is-alpha.outputs.is_alpha }} + steps: - name: Check out the repository uses: actions/checkout@v2 @@ -150,6 +153,14 @@ jobs: - name: Check wheel contents run: | check-wheel-contents dist/*.whl --ignore W007,W008 + + - name: Check if this is an alpha version + id: check-is-alpha + run: | + export is_alpha=0 + if [[ "$(ls -lh dist/)" == *"a1"* ]]; then export is_alpha=1; fi + echo "::set-output name=is_alpha::$is_alpha" + - uses: actions/upload-artifact@v2 with: name: dist @@ -158,6 +169,8 @@ jobs: test-build: name: verify packages / python ${{ matrix.python-version }} / ${{ matrix.os }} + if: needs.build.outputs.is_alpha == 0 + needs: build runs-on: ${{ matrix.os }} diff --git a/CHANGELOG.md b/CHANGELOG.md index bb54c92f3..f9a094942 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ - Use dbt.tests.adapter.basic in test suite ([#298](https://github.com/dbt-labs/dbt-spark/issues/298), [#299](https://github.com/dbt-labs/dbt-spark/pull/299)) - Make internal macros use macro dispatch to be overridable in child adapters ([#319](https://github.com/dbt-labs/dbt-spark/issues/319), [#320](https://github.com/dbt-labs/dbt-spark/pull/320)) - Override adapter method 'run_sql_for_tests' ([#323](https://github.com/dbt-labs/dbt-spark/issues/323), [#324](https://github.com/dbt-labs/dbt-spark/pull/324)) - +- when a table or view doesn't exist, 'adapter.get_columns_in_relation' will return empty list instead of fail ([#328]https://github.com/dbt-labs/dbt-spark/pull/328) ### Contributors - [@JCZuurmond](https://github.com/dbt-labs/dbt-spark/pull/279) ( [#279](https://github.com/dbt-labs/dbt-spark/pull/279)) diff --git a/dbt/adapters/spark/__version__.py b/dbt/adapters/spark/__version__.py index 56ec17a89..d37cdcc76 100644 --- a/dbt/adapters/spark/__version__.py +++ b/dbt/adapters/spark/__version__.py @@ -1 +1 @@ -version = "1.1.0b1" +version = "1.1.0rc1" diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 268417d07..eb001fbc9 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -239,7 +239,7 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: # CDW would just return and empty list, normalizing the behavior here errmsg = getattr(e, "msg", "") if ( - f"Table or view not found: {relation}" in errmsg or + "Table or view not found" in errmsg or "NoSuchTableException" in errmsg ): pass diff --git a/setup.py b/setup.py index 2cd44491e..f9033b37b 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ def _get_dbt_core_version(): package_name = "dbt-spark" -package_version = "1.1.0b1" +package_version = "1.1.0rc1" dbt_core_version = _get_dbt_core_version() description = """The Apache Spark adapter plugin for dbt""" From ac50f231cae3fa61795259eef0204a613fd68f85 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 28 Apr 2022 15:53:53 -0400 Subject: [PATCH 16/16] Bumping version to 1.1.0 (#341) * Bumping version to 1.1.0 * Update CHANGELOG.md Co-authored-by: Github Build Bot Co-authored-by: leahwicz <60146280+leahwicz@users.noreply.github.com> --- .bumpversion.cfg | 3 +-- CHANGELOG.md | 2 +- dbt/adapters/spark/__version__.py | 2 +- setup.py | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index ed807aa05..88442026c 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.1.0rc1 +current_version = 1.1.0 parse = (?P\d+) \.(?P\d+) \.(?P\d+) @@ -25,4 +25,3 @@ first_value = 1 [bumpversion:file:setup.py] [bumpversion:file:dbt/adapters/spark/__version__.py] - diff --git a/CHANGELOG.md b/CHANGELOG.md index f9a094942..4574f5a34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## dbt-spark 1.1.0 (TBD) +## dbt-spark 1.1.0 (April 28, 2022) ### Features - Add session connection method ([#272](https://github.com/dbt-labs/dbt-spark/issues/272), [#279](https://github.com/dbt-labs/dbt-spark/pull/279)) diff --git a/dbt/adapters/spark/__version__.py b/dbt/adapters/spark/__version__.py index d37cdcc76..b2b60a550 100644 --- a/dbt/adapters/spark/__version__.py +++ b/dbt/adapters/spark/__version__.py @@ -1 +1 @@ -version = "1.1.0rc1" +version = "1.1.0" diff --git a/setup.py b/setup.py index f9033b37b..094b96a7b 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ def _get_dbt_core_version(): package_name = "dbt-spark" -package_version = "1.1.0rc1" +package_version = "1.1.0" dbt_core_version = _get_dbt_core_version() description = """The Apache Spark adapter plugin for dbt"""