From c67c5a349d82fa2e57d946c10fcb0a76eff994c2 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Wed, 9 Mar 2022 17:27:10 -0500 Subject: [PATCH 01/18] option 2: show tables + show views --- dbt/adapters/spark/impl.py | 58 +++++++++++++++++---------- dbt/include/spark/macros/adapters.sql | 16 ++++++-- 2 files changed, 49 insertions(+), 25 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 74845422b..2ea774a05 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -23,7 +23,8 @@ GET_COLUMNS_IN_RELATION_MACRO_NAME = 'get_columns_in_relation' LIST_SCHEMAS_MACRO_NAME = 'list_schemas' -LIST_RELATIONS_MACRO_NAME = 'list_relations_without_caching' +LIST_TABLES_MACRO_NAME = 'spark__list_tables_without_caching' +LIST_VIEWS_MACRO_NAME = 'spark__list_views_without_caching' DROP_RELATION_MACRO_NAME = 'drop_relation' FETCH_TBL_PROPERTIES_MACRO_NAME = 'fetch_tbl_properties' @@ -126,10 +127,14 @@ def add_schema_to_cache(self, schema) -> str: def list_relations_without_caching( self, schema_relation: SparkRelation ) -> List[SparkRelation]: - kwargs = {'schema_relation': schema_relation} + kwargs = {'relation': schema_relation} try: - results = self.execute_macro( - LIST_RELATIONS_MACRO_NAME, + tables = self.execute_macro( + LIST_TABLES_MACRO_NAME, + kwargs=kwargs + ) + views = self.execute_macro( + LIST_VIEWS_MACRO_NAME, kwargs=kwargs ) except dbt.exceptions.RuntimeException as e: @@ -140,29 +145,40 @@ def list_relations_without_caching( description = "Error while retrieving information about" logger.debug(f"{description} {schema_relation}: {e.msg}") return [] - + + relations = [] - for row in results: - if len(row) != 4: - raise dbt.exceptions.RuntimeException( - f'Invalid value from "show table extended ...", ' - f'got {len(row)} values, expected 4' - ) - _schema, name, _, information = row - rel_type = RelationType.View \ - if 'Type: VIEW' in information else RelationType.Table - is_delta = 'Provider: delta' in information - is_hudi = 'Provider: hudi' in information + for tbl in tables: + rel_type = ('view' if tbl['tableName'] in views.columns["viewName"].values() else 'table') relation = self.Relation.create( - schema=_schema, - identifier=name, + schema=tbl['database'], + identifier=tbl['tableName'], type=rel_type, - information=information, - is_delta=is_delta, - is_hudi=is_hudi, ) relations.append(relation) +# relations = [] +# for row in results: +# if len(row) != 4: +# raise dbt.exceptions.RuntimeException( +# f'Invalid value from "show table extended ...", ' +# f'got {len(row)} values, expected 4' +# ) +# _schema, name, _, information = row +# rel_type = RelationType.View \ +# if 'Type: VIEW' in information else RelationType.Table +# is_delta = 'Provider: delta' in information +# is_hudi = 'Provider: hudi' in information +# relation = self.Relation.create( +# schema=_schema, +# identifier=name, +# type=rel_type, +# information=information, +# is_delta=is_delta, +# is_hudi=is_hudi, +# ) +# relations.append(relation) + return relations def get_relation( diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 2542af811..bb2e7a187 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -141,12 +141,20 @@ {% do return(load_result('get_columns_in_relation').table) %} {% endmacro %} -{% macro spark__list_relations_without_caching(relation) %} - {% call statement('list_relations_without_caching', fetch_result=True) -%} - show table extended in {{ relation }} like '*' +{% macro spark__list_tables_without_caching(relation) %} + {% call statement('list_tables_without_caching', fetch_result=True) -%} + show tables in {{ relation.schema }} like '*' {% endcall %} - {% do return(load_result('list_relations_without_caching').table) %} + {% do return(load_result('list_tables_without_caching').table) %} +{% endmacro %} + +{% macro spark__list_views_without_caching(relation) %} + {% call statement('list_views_without_caching', fetch_result=True) -%} + show views in {{ relation.schema }} like '*' + {% endcall %} + + {% do return(load_result('list_views_without_caching').table) %} {% endmacro %} {% macro spark__list_schemas(database) -%} From ecc184740147d278eb98faa4d65386856d9f8905 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Thu, 10 Mar 2022 23:47:01 -0500 Subject: [PATCH 02/18] simplify --- dbt/include/spark/macros/adapters.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index bb2e7a187..e2de76b36 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -143,7 +143,7 @@ {% macro spark__list_tables_without_caching(relation) %} {% call statement('list_tables_without_caching', fetch_result=True) -%} - show tables in {{ relation.schema }} like '*' + show tables in {{ relation.schema }} {% endcall %} {% do return(load_result('list_tables_without_caching').table) %} @@ -151,7 +151,7 @@ {% macro spark__list_views_without_caching(relation) %} {% call statement('list_views_without_caching', fetch_result=True) -%} - show views in {{ relation.schema }} like '*' + show views in {{ relation.schema }} {% endcall %} {% do return(load_result('list_views_without_caching').table) %} From 4a3fa57924bd6aca098bccfff99d64546594989a Mon Sep 17 00:00:00 2001 From: leahwicz <60146280+leahwicz@users.noreply.github.com> Date: Tue, 12 Apr 2022 17:43:12 -0400 Subject: [PATCH 03/18] Updating requirements to point to release branch --- dev_requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev_requirements.txt b/dev_requirements.txt index 0f84cbd5d..60f36c6ef 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,7 +1,7 @@ # install latest changes in dbt-core # TODO: how to automate switching from develop to version branches? -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core -git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter +git+https://github.com/dbt-labs/dbt-core.git@1.1.latest#egg=dbt-core&subdirectory=core +git+https://github.com/dbt-labs/dbt-core.git@1.1.latest#egg=dbt-tests-adapter&subdirectory=tests/adapter freezegun==0.3.9 pytest>=6.0.2 From 6862c7f32b620e167140e5b34270888b2830049a Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Wed, 13 Apr 2022 10:56:33 -0600 Subject: [PATCH 04/18] Backport table not exist, Bumping version to 1.1.0rc1 (#333) --- .bumpversion.cfg | 2 +- .github/workflows/main.yml | 13 +++++++++++++ CHANGELOG.md | 2 +- dbt/adapters/spark/__version__.py | 2 +- dbt/adapters/spark/impl.py | 2 +- setup.py | 2 +- 6 files changed, 18 insertions(+), 5 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 9a0c41a56..ed807aa05 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.1.0b1 +current_version = 1.1.0rc1 parse = (?P\d+) \.(?P\d+) \.(?P\d+) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 60a0d6f60..fbdbbbaae 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -122,6 +122,9 @@ jobs: runs-on: ubuntu-latest + outputs: + is_alpha: ${{ steps.check-is-alpha.outputs.is_alpha }} + steps: - name: Check out the repository uses: actions/checkout@v2 @@ -150,6 +153,14 @@ jobs: - name: Check wheel contents run: | check-wheel-contents dist/*.whl --ignore W007,W008 + + - name: Check if this is an alpha version + id: check-is-alpha + run: | + export is_alpha=0 + if [[ "$(ls -lh dist/)" == *"a1"* ]]; then export is_alpha=1; fi + echo "::set-output name=is_alpha::$is_alpha" + - uses: actions/upload-artifact@v2 with: name: dist @@ -158,6 +169,8 @@ jobs: test-build: name: verify packages / python ${{ matrix.python-version }} / ${{ matrix.os }} + if: needs.build.outputs.is_alpha == 0 + needs: build runs-on: ${{ matrix.os }} diff --git a/CHANGELOG.md b/CHANGELOG.md index bb54c92f3..f9a094942 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ - Use dbt.tests.adapter.basic in test suite ([#298](https://github.com/dbt-labs/dbt-spark/issues/298), [#299](https://github.com/dbt-labs/dbt-spark/pull/299)) - Make internal macros use macro dispatch to be overridable in child adapters ([#319](https://github.com/dbt-labs/dbt-spark/issues/319), [#320](https://github.com/dbt-labs/dbt-spark/pull/320)) - Override adapter method 'run_sql_for_tests' ([#323](https://github.com/dbt-labs/dbt-spark/issues/323), [#324](https://github.com/dbt-labs/dbt-spark/pull/324)) - +- when a table or view doesn't exist, 'adapter.get_columns_in_relation' will return empty list instead of fail ([#328]https://github.com/dbt-labs/dbt-spark/pull/328) ### Contributors - [@JCZuurmond](https://github.com/dbt-labs/dbt-spark/pull/279) ( [#279](https://github.com/dbt-labs/dbt-spark/pull/279)) diff --git a/dbt/adapters/spark/__version__.py b/dbt/adapters/spark/__version__.py index 56ec17a89..d37cdcc76 100644 --- a/dbt/adapters/spark/__version__.py +++ b/dbt/adapters/spark/__version__.py @@ -1 +1 @@ -version = "1.1.0b1" +version = "1.1.0rc1" diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 268417d07..eb001fbc9 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -239,7 +239,7 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: # CDW would just return and empty list, normalizing the behavior here errmsg = getattr(e, "msg", "") if ( - f"Table or view not found: {relation}" in errmsg or + "Table or view not found" in errmsg or "NoSuchTableException" in errmsg ): pass diff --git a/setup.py b/setup.py index 2cd44491e..f9033b37b 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ def _get_dbt_core_version(): package_name = "dbt-spark" -package_version = "1.1.0b1" +package_version = "1.1.0rc1" dbt_core_version = _get_dbt_core_version() description = """The Apache Spark adapter plugin for dbt""" From ac50f231cae3fa61795259eef0204a613fd68f85 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 28 Apr 2022 15:53:53 -0400 Subject: [PATCH 05/18] Bumping version to 1.1.0 (#341) * Bumping version to 1.1.0 * Update CHANGELOG.md Co-authored-by: Github Build Bot Co-authored-by: leahwicz <60146280+leahwicz@users.noreply.github.com> --- .bumpversion.cfg | 3 +-- CHANGELOG.md | 2 +- dbt/adapters/spark/__version__.py | 2 +- setup.py | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index ed807aa05..88442026c 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.1.0rc1 +current_version = 1.1.0 parse = (?P\d+) \.(?P\d+) \.(?P\d+) @@ -25,4 +25,3 @@ first_value = 1 [bumpversion:file:setup.py] [bumpversion:file:dbt/adapters/spark/__version__.py] - diff --git a/CHANGELOG.md b/CHANGELOG.md index f9a094942..4574f5a34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## dbt-spark 1.1.0 (TBD) +## dbt-spark 1.1.0 (April 28, 2022) ### Features - Add session connection method ([#272](https://github.com/dbt-labs/dbt-spark/issues/272), [#279](https://github.com/dbt-labs/dbt-spark/pull/279)) diff --git a/dbt/adapters/spark/__version__.py b/dbt/adapters/spark/__version__.py index d37cdcc76..b2b60a550 100644 --- a/dbt/adapters/spark/__version__.py +++ b/dbt/adapters/spark/__version__.py @@ -1 +1 @@ -version = "1.1.0rc1" +version = "1.1.0" diff --git a/setup.py b/setup.py index f9033b37b..094b96a7b 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ def _get_dbt_core_version(): package_name = "dbt-spark" -package_version = "1.1.0rc1" +package_version = "1.1.0" dbt_core_version = _get_dbt_core_version() description = """The Apache Spark adapter plugin for dbt""" From bf5a1d6bfb51146befe3f01817d6c138dacd8470 Mon Sep 17 00:00:00 2001 From: TalkWithKeyboard <610347922@qq.com> Date: Fri, 29 Apr 2022 17:48:46 +0800 Subject: [PATCH 06/18] fix: the tbl name in the result of show tables/views --- dbt/adapters/spark/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 0375f1a65..ff6d5b642 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -151,7 +151,7 @@ def list_relations_without_caching( for tbl in tables: rel_type = ('view' if tbl['tableName'] in views.columns["viewName"].values() else 'table') relation = self.Relation.create( - schema=tbl['database'], + schema=tbl['namespace'], identifier=tbl['tableName'], type=rel_type, ) From ba1c87b90689bbc030156c0873b5bbf62a33a021 Mon Sep 17 00:00:00 2001 From: TalkWithKeyboard <610347922@qq.com> Date: Tue, 3 May 2022 00:29:27 +0800 Subject: [PATCH 07/18] fix: select existing ones from namespace and database --- dbt/adapters/spark/impl.py | 33 +++++++-------------------------- 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index ff6d5b642..254600dcc 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -145,40 +145,21 @@ def list_relations_without_caching( description = "Error while retrieving information about" logger.debug(f"{description} {schema_relation}: {e.msg}") return [] - - + + relations = [] + view_names = views.columns["viewName"].values() + for tbl in tables: - rel_type = ('view' if tbl['tableName'] in views.columns["viewName"].values() else 'table') + rel_type = RelationType('view' if tbl['tableName'] in view_names else 'table') + schema = tbl['namespace'] if 'namespace' in tbl else tbl['database'] relation = self.Relation.create( - schema=tbl['namespace'], + schema=schema, identifier=tbl['tableName'], type=rel_type, ) relations.append(relation) -# relations = [] -# for row in results: -# if len(row) != 4: -# raise dbt.exceptions.RuntimeException( -# f'Invalid value from "show table extended ...", ' -# f'got {len(row)} values, expected 4' -# ) -# _schema, name, _, information = row -# rel_type = RelationType.View \ -# if 'Type: VIEW' in information else RelationType.Table -# is_delta = 'Provider: delta' in information -# is_hudi = 'Provider: hudi' in information -# relation = self.Relation.create( -# schema=_schema, -# identifier=name, -# type=rel_type, -# information=information, -# is_delta=is_delta, -# is_hudi=is_hudi, -# ) -# relations.append(relation) - return relations def get_relation( From 7a5428e7c4d36b572527668401b53eff468be724 Mon Sep 17 00:00:00 2001 From: TalkWithKeyboard <610347922@qq.com> Date: Wed, 4 May 2022 16:08:27 +0800 Subject: [PATCH 08/18] chore: upgrade the spark version in the integration to 3.2.1 --- .circleci/config.yml | 13 +++++++------ dbt/adapters/spark/impl.py | 3 +-- dbt/include/spark/macros/adapters.sql | 2 -- tests/functional/adapter/test_basic.py | 7 ++++++- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 34e449acf..615a58532 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -8,19 +8,20 @@ jobs: - image: fishtownanalytics/test-container:10 steps: - checkout - - run: tox -e flake8,unit + - run: tox -e unit integration-spark-session: environment: DBT_INVOCATION_ENV: circle docker: - - image: godatadriven/pyspark:3.1 + - image: bitnami/spark:3.2.1 + user: root steps: - checkout - run: apt-get update - - run: python3 -m pip install --upgrade pip - run: apt-get install -y git gcc g++ unixodbc-dev libsasl2-dev - - run: python3 -m pip install tox + - run: python3 -m pip install --upgrade pip + - run: python3 -m pip install flake8 tox pyspark==3.2.1 - run: name: Run integration tests command: tox -e integration-spark-session @@ -33,7 +34,7 @@ jobs: DBT_INVOCATION_ENV: circle docker: - image: fishtownanalytics/test-container:10 - - image: godatadriven/spark:2 + - image: godatadriven/spark:3.1 environment: WAIT_FOR: localhost:5432 command: > @@ -44,7 +45,7 @@ jobs: --conf spark.hadoop.javax.jdo.option.ConnectionPassword=dbt --conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.postgresql.Driver --conf spark.serializer=org.apache.spark.serializer.KryoSerializer - --conf spark.jars.packages=org.apache.hudi:hudi-spark-bundle_2.11:0.9.0 + --conf spark.jars.packages=org.apache.hudi:hudi-spark3.1-bundle_2.12:0.11.0 --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --conf spark.driver.userClassPathFirst=true --conf spark.hadoop.datanucleus.autoCreateTables=true diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 6eb4c74d7..1816b30f5 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -1,3 +1,4 @@ +import json import re from concurrent.futures import Future from dataclasses import dataclass @@ -149,8 +150,6 @@ def list_relations_without_caching( relations = [] view_names = views.columns["viewName"].values() - raise Exception(f"tbl: {tables.print_json()}") - for tbl in tables: rel_type = RelationType('view' if tbl['tableName'] in view_names else 'table') _schema = tbl['namespace'] if 'namespace' in tbl else tbl['database'] diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index a9a18e78d..77a59bb9e 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -179,7 +179,6 @@ {% call statement('list_tables_without_caching', fetch_result=True) -%} show tables in {{ relation.schema }} {% endcall %} - {% do return(load_result('list_tables_without_caching').table) %} {% endmacro %} @@ -187,7 +186,6 @@ {% call statement('list_views_without_caching', fetch_result=True) -%} show views in {{ relation.schema }} {% endcall %} - {% do return(load_result('list_views_without_caching').table) %} {% endmacro %} diff --git a/tests/functional/adapter/test_basic.py b/tests/functional/adapter/test_basic.py index 70f3267a4..6d315398e 100644 --- a/tests/functional/adapter/test_basic.py +++ b/tests/functional/adapter/test_basic.py @@ -12,6 +12,7 @@ from dbt.tests.adapter.basic.test_snapshot_check_cols import BaseSnapshotCheckCols from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestamp from dbt.tests.adapter.basic.test_adapter_methods import BaseAdapterMethod +from dbt.tests.util import run_dbt, check_relations_equal @pytest.mark.skip_profile('spark_session') @@ -80,4 +81,8 @@ def project_config_update(self): } class TestBaseAdapterMethod(BaseAdapterMethod): - pass \ No newline at end of file + def test_adapter_methods(self, project, equal_tables): + run_dbt(["--debug", "compile"]) # trigger any compile-time issues + result = run_dbt() + assert len(result) == 3 + check_relations_equal(project.adapter, equal_tables) \ No newline at end of file From 3301123ee6b2dd7608f577e44a11bd062d2127ce Mon Sep 17 00:00:00 2001 From: TalkWithKeyboard <610347922@qq.com> Date: Sat, 7 May 2022 17:59:47 +0800 Subject: [PATCH 09/18] fix: the wrong use of agate.Table --- .circleci/config.yml | 9 +++++---- dbt/adapters/spark/impl.py | 3 +-- docker-compose.yml | 2 +- tests/functional/adapter/test_basic.py | 7 +------ 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 615a58532..18b8ec45e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -8,20 +8,19 @@ jobs: - image: fishtownanalytics/test-container:10 steps: - checkout - - run: tox -e unit + - run: tox -e flake8,unit integration-spark-session: environment: DBT_INVOCATION_ENV: circle docker: - - image: bitnami/spark:3.2.1 - user: root + - image: godatadriven/pyspark:3.1 steps: - checkout - run: apt-get update - run: apt-get install -y git gcc g++ unixodbc-dev libsasl2-dev - run: python3 -m pip install --upgrade pip - - run: python3 -m pip install flake8 tox pyspark==3.2.1 + - run: python3 -m pip install tox - run: name: Run integration tests command: tox -e integration-spark-session @@ -52,6 +51,8 @@ jobs: --conf spark.hadoop.datanucleus.schema.autoCreateTables=true --conf spark.hadoop.datanucleus.fixedDatastore=false --conf spark.sql.hive.convertMetastoreParquet=false + --conf spark.driver.memory=2g + --conf spark.executor.memory=2g --hiveconf hoodie.datasource.hive_sync.use_jdbc=false --hiveconf hoodie.datasource.hive_sync.mode=hms --hiveconf datanucleus.schema.autoCreateAll=true diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 1816b30f5..1361a56ed 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -1,4 +1,3 @@ -import json import re from concurrent.futures import Future from dataclasses import dataclass @@ -152,7 +151,7 @@ def list_relations_without_caching( for tbl in tables: rel_type = RelationType('view' if tbl['tableName'] in view_names else 'table') - _schema = tbl['namespace'] if 'namespace' in tbl else tbl['database'] + _schema = tbl['namespace'] if 'namespace' in tables.column_names else tbl['database'] relation = self.Relation.create( schema=_schema, identifier=tbl['tableName'], diff --git a/docker-compose.yml b/docker-compose.yml index 8054dfd75..42bcadc28 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: "3.7" services: dbt-spark2-thrift: - image: godatadriven/spark:3.0 + image: godatadriven/spark:3.1 ports: - "10000:10000" - "4040:4040" diff --git a/tests/functional/adapter/test_basic.py b/tests/functional/adapter/test_basic.py index 6d315398e..70f3267a4 100644 --- a/tests/functional/adapter/test_basic.py +++ b/tests/functional/adapter/test_basic.py @@ -12,7 +12,6 @@ from dbt.tests.adapter.basic.test_snapshot_check_cols import BaseSnapshotCheckCols from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestamp from dbt.tests.adapter.basic.test_adapter_methods import BaseAdapterMethod -from dbt.tests.util import run_dbt, check_relations_equal @pytest.mark.skip_profile('spark_session') @@ -81,8 +80,4 @@ def project_config_update(self): } class TestBaseAdapterMethod(BaseAdapterMethod): - def test_adapter_methods(self, project, equal_tables): - run_dbt(["--debug", "compile"]) # trigger any compile-time issues - result = run_dbt() - assert len(result) == 3 - check_relations_equal(project.adapter, equal_tables) \ No newline at end of file + pass \ No newline at end of file From 1fd66715110b65a25d16743b03e761483502655e Mon Sep 17 00:00:00 2001 From: TalkWithKeyboard <610347922@qq.com> Date: Sat, 14 May 2022 17:16:55 +0800 Subject: [PATCH 10/18] get extended information and update cached relation when need it --- .circleci/config.yml | 6 +- dbt/adapters/spark/impl.py | 157 +++++++++++++++++++++------------ dbt/adapters/spark/relation.py | 16 +++- tests/unit/test_adapter.py | 47 +++++++++- 4 files changed, 161 insertions(+), 65 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 18b8ec45e..affde0f70 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -33,7 +33,7 @@ jobs: DBT_INVOCATION_ENV: circle docker: - image: fishtownanalytics/test-container:10 - - image: godatadriven/spark:3.1 + - image: godatadriven/spark:2 environment: WAIT_FOR: localhost:5432 command: > @@ -44,15 +44,13 @@ jobs: --conf spark.hadoop.javax.jdo.option.ConnectionPassword=dbt --conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.postgresql.Driver --conf spark.serializer=org.apache.spark.serializer.KryoSerializer - --conf spark.jars.packages=org.apache.hudi:hudi-spark3.1-bundle_2.12:0.11.0 + --conf spark.jars.packages=org.apache.hudi:hudi-spark-bundle_2.11:0.9.0 --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --conf spark.driver.userClassPathFirst=true --conf spark.hadoop.datanucleus.autoCreateTables=true --conf spark.hadoop.datanucleus.schema.autoCreateTables=true --conf spark.hadoop.datanucleus.fixedDatastore=false --conf spark.sql.hive.convertMetastoreParquet=false - --conf spark.driver.memory=2g - --conf spark.executor.memory=2g --hiveconf hoodie.datasource.hive_sync.use_jdbc=false --hiveconf hoodie.datasource.hive_sync.mode=hms --hiveconf datanucleus.schema.autoCreateAll=true diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 1361a56ed..30ed5ed2c 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -1,9 +1,15 @@ import re from concurrent.futures import Future from dataclasses import dataclass -from typing import Optional, List, Dict, Any, Union, Iterable +from typing import Optional, List, Dict, Any, Union, Iterable, Tuple import agate +from dbt.adapters.cache import RelationsCache, _CachedRelation from dbt.contracts.relation import RelationType +from dbt.events.base_types import DebugLevel, Cache +from dbt.adapters.reference_keys import _ReferenceKey, _make_key +from dbt.events.functions import fire_event +from dbt.events.types import AddRelation, DumpBeforeAddGraph +from dbt.helper_types import Lazy import dbt import dbt.exceptions @@ -30,6 +36,7 @@ KEY_TABLE_OWNER = 'Owner' KEY_TABLE_STATISTICS = 'Statistics' +KEY_TABLE_PROVIDER = 'Provider' @dataclass @@ -43,6 +50,36 @@ class SparkConfig(AdapterConfig): merge_update_columns: Optional[str] = None +@dataclass +class UpdateRelation(DebugLevel, Cache): + relation: _ReferenceKey + code: str = "E038" + + def message(self) -> str: + return f"Updating relation: {str(self.relation)}" + + +class SparkRelationsCache(RelationsCache): + def _update(self, relation: _CachedRelation): + key = relation.key() + + if key not in self.relations: + return + + self.relations[key].inner = relation.inner + + def upsert_relation(self, relation): + """Update the relation inner to the cache + + : param BaseRelation relation: The underlying relation. + """ + cached = _CachedRelation(relation) + fire_event(UpdateRelation(relation=_make_key(cached))) + + with self.lock: + self._update(cached) + + class SparkAdapter(SQLAdapter): COLUMN_NAMES = ( 'table_database', @@ -83,6 +120,7 @@ class SparkAdapter(SQLAdapter): Column = SparkColumn ConnectionManager = SparkConnectionManager AdapterSpecificConfigs = SparkConfig + cache = SparkRelationsCache() @classmethod def date_function(cls) -> str: @@ -167,13 +205,14 @@ def get_relation( if not self.Relation.include_policy.database: database = None - return super().get_relation(database, schema, identifier) + cached = super().get_relation(database, schema, identifier) + return self._set_relation_information(cached) def parse_describe_extended( self, relation: Relation, raw_rows: List[agate.Row] - ) -> List[SparkColumn]: + ) -> Tuple[Dict[str, any], List[SparkColumn]]: # Convert the Row to a dict dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows] # Find the separator between the rows and the metadata provided @@ -191,7 +230,7 @@ def parse_describe_extended( raw_table_stats = metadata.get(KEY_TABLE_STATISTICS) table_stats = SparkColumn.convert_table_stats(raw_table_stats) - return [SparkColumn( + return metadata, [SparkColumn( table_database=None, table_schema=relation.schema, table_name=relation.name, @@ -219,69 +258,77 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: for cached_relation in cached_relations if str(cached_relation) == str(relation)), None) + + updated_relation = self._set_relation_information(cached_relation) + return self._get_spark_columns(updated_relation) + + def _set_relation_information( + self, relation: SparkRelation + ) -> SparkRelation: + """Update the information of the relation, or return it if it already exists.""" + if relation.has_information: + return relation + + metadata = None columns = [] - if cached_relation and cached_relation.information: - columns = self.parse_columns_from_information(cached_relation) - if not columns: - # in open source delta 'show table extended' query output doesnt - # return relation's schema. if columns are empty from cache, - # use get_columns_in_relation spark macro - # which would execute 'describe extended tablename' query - try: - rows: List[agate.Row] = super().get_columns_in_relation(relation) - columns = self.parse_describe_extended(relation, rows) - except dbt.exceptions.RuntimeException as e: - # spark would throw error when table doesn't exist, where other - # CDW would just return and empty list, normalizing the behavior here - errmsg = getattr(e, "msg", "") - if ( - "Table or view not found" in errmsg or - "NoSuchTableException" in errmsg - ): - pass - else: - raise e + + try: + rows: List[agate.Row] = super().get_columns_in_relation(relation) + metadata, columns = self.parse_describe_extended(relation, rows) + except dbt.exceptions.RuntimeException as e: + # spark would throw error when table doesn't exist, where other + # CDW would just return and empty list, normalizing the behavior here + errmsg = getattr(e, "msg", "") + if ( + "Table or view not found" in errmsg or + "NoSuchTableException" in errmsg + ): + pass + else: + raise e # strip hudi metadata columns. columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS] - return columns - def parse_columns_from_information( - self, relation: SparkRelation + provider = metadata[KEY_TABLE_PROVIDER] + new_relation = self.Relation.create( + database=None, + schema=relation.schema, + identifier=relation.identifier, + type=relation.type, + is_delta=(provider == 'delta'), + is_hudi=(provider == 'hudi'), + owner=metadata[KEY_TABLE_OWNER], + stats=metadata[KEY_TABLE_STATISTICS], + columns={x.column: x.dtype for x in columns} + ) + + self.cache.upsert_relation(new_relation) + return new_relation + + @staticmethod + def _get_spark_columns( + relation: SparkRelation ) -> List[SparkColumn]: - owner_match = re.findall( - self.INFORMATION_OWNER_REGEX, relation.information) - owner = owner_match[0] if owner_match else None - matches = re.finditer( - self.INFORMATION_COLUMNS_REGEX, relation.information) - columns = [] - stats_match = re.findall( - self.INFORMATION_STATISTICS_REGEX, relation.information) - raw_table_stats = stats_match[0] if stats_match else None - table_stats = SparkColumn.convert_table_stats(raw_table_stats) - for match_num, match in enumerate(matches): - column_name, column_type, nullable = match.groups() - column = SparkColumn( - table_database=None, - table_schema=relation.schema, - table_name=relation.table, - table_type=relation.type, - column_index=match_num, - table_owner=owner, - column=column_name, - dtype=column_type, - table_stats=table_stats - ) - columns.append(column) - return columns + return [SparkColumn( + table_database=None, + table_schema=relation.schema, + table_name=relation.name, + table_type=relation.type, + table_owner=relation.owner, + table_stats=relation.stats, + column=name, + column_index=idx, + dtype=dtype + ) for idx, (name, dtype) in enumerate(relation.columns.items())] def _get_columns_for_catalog( self, relation: SparkRelation ) -> Iterable[Dict[str, Any]]: - columns = self.parse_columns_from_information(relation) + updated_relation = self._set_relation_information(relation) - for column in columns: + for column in self._get_spark_columns(updated_relation): # convert SparkColumns into catalog dicts as_dict = column.to_column_dict() as_dict['column_name'] = as_dict.pop('column', None) diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index 043cabfa0..1a9820903 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -1,8 +1,10 @@ -from typing import Optional +from typing import Optional, List, Dict, Hashable -from dataclasses import dataclass +from dataclasses import dataclass, field +from dbt.adapters.base import Column from dbt.adapters.base.relation import BaseRelation, Policy +from dbt.contracts.relation import FakeAPIObject from dbt.exceptions import RuntimeException @@ -27,7 +29,9 @@ class SparkRelation(BaseRelation): quote_character: str = '`' is_delta: Optional[bool] = None is_hudi: Optional[bool] = None - information: str = None + owner: Optional[str] = None + stats: Optional[str] = None + columns: Dict[str, str] = field(default_factory=lambda: {}) def __post_init__(self): if self.database != self.schema and self.database: @@ -40,3 +44,9 @@ def render(self): 'include, but only one can be set' ) return super().render() + + @property + def has_information(self) -> bool: + return self.owner is not None and \ + self.stats is not None and \ + len(self.columns) > 0 diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index f87a89b2b..6f0fd0727 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -2,6 +2,7 @@ from unittest import mock import dbt.flags as flags +import pytest from dbt.exceptions import RuntimeException from agate import Row from pyhive import hive @@ -298,8 +299,27 @@ def test_parse_relation(self): for r in plain_rows] config = self._get_target_http(self.project_cfg) - rows = SparkAdapter(config).parse_describe_extended( + metadata, rows = SparkAdapter(config).parse_describe_extended( relation, input_cols) + + self.assertDictEqual(metadata, { + '# col_name': 'data_type', + 'dt': 'date', + None: None, + '# Detailed Table Information': None, + 'Database': None, + 'Owner': 'root', + 'Created Time': 'Wed Feb 04 18:15:00 UTC 1815', + 'Last Access': 'Wed May 20 19:25:00 UTC 1925', + 'Type': 'MANAGED', + 'Provider': 'delta', + 'Location': '/mnt/vo', + 'Serde Library': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', + 'InputFormat': 'org.apache.hadoop.mapred.SequenceFileInputFormat', + 'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat', + 'Partition Provider': 'Catalog' + }) + self.assertEqual(len(rows), 4) self.assertEqual(rows[0].to_column_dict(omit_none=False), { 'table_database': None, @@ -379,7 +399,7 @@ def test_parse_relation_with_integer_owner(self): for r in plain_rows] config = self._get_target_http(self.project_cfg) - rows = SparkAdapter(config).parse_describe_extended( + _, rows = SparkAdapter(config).parse_describe_extended( relation, input_cols) self.assertEqual(rows[0].to_column_dict().get('table_owner'), '1234') @@ -419,8 +439,26 @@ def test_parse_relation_with_statistics(self): for r in plain_rows] config = self._get_target_http(self.project_cfg) - rows = SparkAdapter(config).parse_describe_extended( + metadata, rows = SparkAdapter(config).parse_describe_extended( relation, input_cols) + + self.assertEqual(metadata, { + None: None, + '# Detailed Table Information': None, + 'Database': None, + 'Owner': 'root', + 'Created Time': 'Wed Feb 04 18:15:00 UTC 1815', + 'Last Access': 'Wed May 20 19:25:00 UTC 1925', + 'Statistics': '1109049927 bytes, 14093476 rows', + 'Type': 'MANAGED', + 'Provider': 'delta', + 'Location': '/mnt/vo', + 'Serde Library': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', + 'InputFormat': 'org.apache.hadoop.mapred.SequenceFileInputFormat', + 'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat', + 'Partition Provider': 'Catalog' + }) + self.assertEqual(len(rows), 1) self.assertEqual(rows[0].to_column_dict(omit_none=False), { 'table_database': None, @@ -497,6 +535,7 @@ def test_profile_with_cluster_and_sql_endpoint(self): with self.assertRaises(RuntimeException): config_from_parts_or_dicts(self.project_cfg, profile) + @pytest.mark.skip() def test_parse_columns_from_information_with_table_type_and_delta_provider(self): self.maxDiff = None rel_type = SparkRelation.get_relation_type.Table @@ -574,6 +613,7 @@ def test_parse_columns_from_information_with_table_type_and_delta_provider(self) 'stats:bytes:value': 123456789, }) + @pytest.mark.skip() def test_parse_columns_from_information_with_view_type(self): self.maxDiff = None rel_type = SparkRelation.get_relation_type.View @@ -649,6 +689,7 @@ def test_parse_columns_from_information_with_view_type(self): 'char_size': None }) + @pytest.mark.skip() def test_parse_columns_from_information_with_table_type_and_parquet_provider(self): self.maxDiff = None rel_type = SparkRelation.get_relation_type.Table From e830b248daaf0026a765ac1e1aadc9615d2a7eac Mon Sep 17 00:00:00 2001 From: TalkWithKeyboard <610347922@qq.com> Date: Sat, 14 May 2022 17:54:46 +0800 Subject: [PATCH 11/18] polish --- .circleci/config.yml | 2 +- dbt/adapters/spark/impl.py | 63 ++++++++++++++++++++-------------- dbt/adapters/spark/relation.py | 3 -- docker-compose.yml | 2 +- 4 files changed, 39 insertions(+), 31 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index affde0f70..34e449acf 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -18,8 +18,8 @@ jobs: steps: - checkout - run: apt-get update - - run: apt-get install -y git gcc g++ unixodbc-dev libsasl2-dev - run: python3 -m pip install --upgrade pip + - run: apt-get install -y git gcc g++ unixodbc-dev libsasl2-dev - run: python3 -m pip install tox - run: name: Run integration tests diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 16f0898f9..316cfe3d4 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -10,8 +10,6 @@ from dbt.events.base_types import DebugLevel, Cache from dbt.adapters.reference_keys import _ReferenceKey, _make_key from dbt.events.functions import fire_event -from dbt.events.types import AddRelation, DumpBeforeAddGraph -from dbt.helper_types import Lazy import dbt import dbt.exceptions @@ -29,16 +27,16 @@ logger = AdapterLogger("Spark") -GET_COLUMNS_IN_RELATION_MACRO_NAME = 'get_columns_in_relation' -LIST_SCHEMAS_MACRO_NAME = 'list_schemas' -LIST_TABLES_MACRO_NAME = 'spark__list_tables_without_caching' -LIST_VIEWS_MACRO_NAME = 'spark__list_views_without_caching' -DROP_RELATION_MACRO_NAME = 'drop_relation' -FETCH_TBL_PROPERTIES_MACRO_NAME = 'fetch_tbl_properties' +GET_COLUMNS_IN_RELATION_MACRO_NAME = "get_columns_in_relation" +LIST_SCHEMAS_MACRO_NAME = "list_schemas" +LIST_TABLES_MACRO_NAME = "spark__list_tables_without_caching" +LIST_VIEWS_MACRO_NAME = "spark__list_views_without_caching" +DROP_RELATION_MACRO_NAME = "drop_relation" +FETCH_TBL_PROPERTIES_MACRO_NAME = "fetch_tbl_properties" -KEY_TABLE_OWNER = 'Owner' -KEY_TABLE_STATISTICS = 'Statistics' -KEY_TABLE_PROVIDER = 'Provider' +KEY_TABLE_OWNER = "Owner" +KEY_TABLE_STATISTICS = "Statistics" +KEY_TABLE_PROVIDER = "Provider" @dataclass @@ -61,11 +59,21 @@ def message(self) -> str: return f"Updating relation: {str(self.relation)}" +@dataclass +class UpdateMissingRelation(DebugLevel, Cache): + relation: _ReferenceKey + code: str = "E039" + + def message(self) -> str: + return f"updated a nonexistent relationship: {str(self.relation)}" + + class SparkRelationsCache(RelationsCache): def _update(self, relation: _CachedRelation): key = relation.key() if key not in self.relations: + fire_event(UpdateMissingRelation(relation=key)) return self.relations[key].inner = relation.inner @@ -114,10 +122,10 @@ class SparkAdapter(SQLAdapter): "_hoodie_file_name", ] - Relation = SparkRelation - Column = SparkColumn - ConnectionManager = SparkConnectionManager - AdapterSpecificConfigs = SparkConfig + Relation: TypeAlias = SparkRelation + Column: TypeAlias = SparkColumn + ConnectionManager: TypeAlias = SparkConnectionManager + AdapterSpecificConfigs: TypeAlias = SparkConfig cache = SparkRelationsCache() @classmethod @@ -163,7 +171,7 @@ def add_schema_to_cache(self, schema) -> str: def list_relations_without_caching( self, schema_relation: SparkRelation ) -> List[SparkRelation]: - kwargs = {'relation': schema_relation} + kwargs = {"relation": schema_relation} try: tables = self.execute_macro( LIST_TABLES_MACRO_NAME, @@ -186,11 +194,11 @@ def list_relations_without_caching( view_names = views.columns["viewName"].values() for tbl in tables: - rel_type = RelationType('view' if tbl['tableName'] in view_names else 'table') - _schema = tbl['namespace'] if 'namespace' in tables.column_names else tbl['database'] + rel_type = RelationType("view" if tbl["tableName"] in view_names else "table") + _schema = tbl["namespace"] if "namespace" in tables.column_names else tbl["database"] relation = self.Relation.create( schema=_schema, - identifier=tbl['tableName'], + identifier=tbl["tableName"], type=rel_type, ) relations.append(relation) @@ -243,12 +251,15 @@ def find_table_information_separator(rows: List[dict]) -> int: return pos def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: - cached_relations = self.cache.get_relations( - relation.database, relation.schema) - cached_relation = next((cached_relation - for cached_relation in cached_relations - if str(cached_relation) == str(relation)), - None) + cached_relations = self.cache.get_relations(relation.database, relation.schema) + cached_relation = next( + ( + cached_relation + for cached_relation in cached_relations + if str(cached_relation) == str(relation) + ), + None, + ) updated_relation = self._set_relation_information(cached_relation) return self._get_spark_columns(updated_relation) @@ -257,7 +268,7 @@ def _set_relation_information( self, relation: SparkRelation ) -> SparkRelation: """Update the information of the relation, or return it if it already exists.""" - if relation.has_information: + if relation.has_information(): return relation metadata = None diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index ccd78a65f..8c314d124 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -2,9 +2,7 @@ from dataclasses import dataclass, field -from dbt.adapters.base import Column from dbt.adapters.base.relation import BaseRelation, Policy -from dbt.contracts.relation import FakeAPIObject from dbt.exceptions import RuntimeException @@ -45,7 +43,6 @@ def render(self): ) return super().render() - @property def has_information(self) -> bool: return self.owner is not None and \ self.stats is not None and \ diff --git a/docker-compose.yml b/docker-compose.yml index 42bcadc28..8054dfd75 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: "3.7" services: dbt-spark2-thrift: - image: godatadriven/spark:3.1 + image: godatadriven/spark:3.0 ports: - "10000:10000" - "4040:4040" From 26720570d6d8b48382615b25ba9054517c237e41 Mon Sep 17 00:00:00 2001 From: TalkWithKeyboard <610347922@qq.com> Date: Sat, 14 May 2022 18:28:13 +0800 Subject: [PATCH 12/18] fix NPE while get_columns_in_relation --- dbt/adapters/spark/impl.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 316cfe3d4..a2b9c66bc 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -261,16 +261,14 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: None, ) - updated_relation = self._set_relation_information(cached_relation) - return self._get_spark_columns(updated_relation) + if not cached_relation: + updated_relation = self.cache.add(self._get_updated_relation(relation)) + else: + updated_relation = self._set_relation_information(relation) - def _set_relation_information( - self, relation: SparkRelation - ) -> SparkRelation: - """Update the information of the relation, or return it if it already exists.""" - if relation.has_information(): - return relation + return self._get_spark_columns(updated_relation) + def _get_updated_relation(self, relation: BaseRelation) -> SparkRelation: metadata = None columns = [] @@ -282,8 +280,8 @@ def _set_relation_information( # CDW would just return and empty list, normalizing the behavior here errmsg = getattr(e, "msg", "") if ( - "Table or view not found" in errmsg or - "NoSuchTableException" in errmsg + "Table or view not found" in errmsg or + "NoSuchTableException" in errmsg ): pass else: @@ -294,7 +292,7 @@ def _set_relation_information( if x.name not in self.HUDI_METADATA_COLUMNS] provider = metadata[KEY_TABLE_PROVIDER] - new_relation = self.Relation.create( + return self.Relation.create( database=None, schema=relation.schema, identifier=relation.identifier, @@ -306,8 +304,15 @@ def _set_relation_information( columns={x.column: x.dtype for x in columns} ) - self.cache.upsert_relation(new_relation) - return new_relation + def _set_relation_information(self, relation: SparkRelation) -> SparkRelation: + """Update the information of the relation, or return it if it already exists.""" + if relation.has_information(): + return relation + + updated_relation = self._get_updated_relation(relation) + + self.cache.upsert_relation(updated_relation) + return updated_relation @staticmethod def _get_spark_columns( From 9fc573265682cc28965ae412af60c9718fcf9c4a Mon Sep 17 00:00:00 2001 From: TalkWithKeyboard <610347922@qq.com> Date: Sun, 15 May 2022 15:55:47 +0800 Subject: [PATCH 13/18] add more debug message --- dbt/adapters/spark/impl.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index a2b9c66bc..7de5fceb9 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -212,6 +212,7 @@ def get_relation( database = None cached = super().get_relation(database, schema, identifier) + logger.info(f">>> get_relation: {cached.render() if cached is not None else 'Empty'}") return self._set_relation_information(cached) def parse_describe_extended( @@ -261,10 +262,13 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: None, ) + logger.info(f">>> get_columns_in_relation: {relation.render() if relation is not None else 'Empty'}, " + f"{cached_relation.render() if cached_relation is not None else 'Empty'}") + if not cached_relation: updated_relation = self.cache.add(self._get_updated_relation(relation)) else: - updated_relation = self._set_relation_information(relation) + updated_relation = self._set_relation_information(cached_relation) return self._get_spark_columns(updated_relation) @@ -333,6 +337,7 @@ def _get_spark_columns( def _get_columns_for_catalog( self, relation: SparkRelation ) -> Iterable[Dict[str, Any]]: + logger.info(f">>> _get_columns_for_catalog: {relation.render() if relation is not None else 'Empty'}") updated_relation = self._set_relation_information(relation) for column in self._get_spark_columns(updated_relation): From 6cd6f6d9453b60a3d79c1ef58bb55364178f7141 Mon Sep 17 00:00:00 2001 From: TalkWithKeyboard <610347922@qq.com> Date: Sun, 15 May 2022 16:02:32 +0800 Subject: [PATCH 14/18] fix NPE while get relation --- dbt/adapters/spark/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 7de5fceb9..9652a24be 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -213,7 +213,7 @@ def get_relation( cached = super().get_relation(database, schema, identifier) logger.info(f">>> get_relation: {cached.render() if cached is not None else 'Empty'}") - return self._set_relation_information(cached) + return self._set_relation_information(cached) if cached else None def parse_describe_extended( self, relation: Relation, raw_rows: List[agate.Row] From 345239b5f6bb44cb1a863163fdee2c2d275a504a Mon Sep 17 00:00:00 2001 From: TalkWithKeyboard <610347922@qq.com> Date: Sun, 15 May 2022 16:21:55 +0800 Subject: [PATCH 15/18] fix NPE while get_columns_in_relation --- dbt/adapters/spark/impl.py | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 9652a24be..7600bc4eb 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -78,7 +78,7 @@ def _update(self, relation: _CachedRelation): self.relations[key].inner = relation.inner - def upsert_relation(self, relation): + def update_relation(self, relation): """Update the relation inner to the cache : param BaseRelation relation: The underlying relation. @@ -126,7 +126,10 @@ class SparkAdapter(SQLAdapter): Column: TypeAlias = SparkColumn ConnectionManager: TypeAlias = SparkConnectionManager AdapterSpecificConfigs: TypeAlias = SparkConfig - cache = SparkRelationsCache() + + def __init__(self, config): + super().__init__(config) + self.cache = SparkRelationsCache() @classmethod def date_function(cls) -> str: @@ -212,7 +215,6 @@ def get_relation( database = None cached = super().get_relation(database, schema, identifier) - logger.info(f">>> get_relation: {cached.render() if cached is not None else 'Empty'}") return self._set_relation_information(cached) if cached else None def parse_describe_extended( @@ -262,17 +264,16 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: None, ) - logger.info(f">>> get_columns_in_relation: {relation.render() if relation is not None else 'Empty'}, " - f"{cached_relation.render() if cached_relation is not None else 'Empty'}") - if not cached_relation: - updated_relation = self.cache.add(self._get_updated_relation(relation)) + updated_relation = self._get_updated_relation(relation) + if updated_relation: + self.cache.add(updated_relation) else: updated_relation = self._set_relation_information(cached_relation) return self._get_spark_columns(updated_relation) - def _get_updated_relation(self, relation: BaseRelation) -> SparkRelation: + def _get_updated_relation(self, relation: BaseRelation) -> Optional[SparkRelation]: metadata = None columns = [] @@ -295,7 +296,10 @@ def _get_updated_relation(self, relation: BaseRelation) -> SparkRelation: columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS] - provider = metadata[KEY_TABLE_PROVIDER] + if not metadata: + return None + + provider = metadata.get(KEY_TABLE_PROVIDER) return self.Relation.create( database=None, schema=relation.schema, @@ -303,8 +307,8 @@ def _get_updated_relation(self, relation: BaseRelation) -> SparkRelation: type=relation.type, is_delta=(provider == 'delta'), is_hudi=(provider == 'hudi'), - owner=metadata[KEY_TABLE_OWNER], - stats=metadata[KEY_TABLE_STATISTICS], + owner=metadata.get(KEY_TABLE_OWNER), + stats=metadata.get(KEY_TABLE_STATISTICS), columns={x.column: x.dtype for x in columns} ) @@ -315,13 +319,16 @@ def _set_relation_information(self, relation: SparkRelation) -> SparkRelation: updated_relation = self._get_updated_relation(relation) - self.cache.upsert_relation(updated_relation) + self.cache.update_relation(updated_relation) return updated_relation @staticmethod def _get_spark_columns( - relation: SparkRelation + relation: Optional[SparkRelation] ) -> List[SparkColumn]: + if not relation: + return [] + return [SparkColumn( table_database=None, table_schema=relation.schema, @@ -337,7 +344,6 @@ def _get_spark_columns( def _get_columns_for_catalog( self, relation: SparkRelation ) -> Iterable[Dict[str, Any]]: - logger.info(f">>> _get_columns_for_catalog: {relation.render() if relation is not None else 'Empty'}") updated_relation = self._set_relation_information(relation) for column in self._get_spark_columns(updated_relation): From 101c6ddaa59d1ef25afeb66e3a07895c9543d954 Mon Sep 17 00:00:00 2001 From: TalkWithKeyboard <610347922@qq.com> Date: Sun, 15 May 2022 18:34:10 +0800 Subject: [PATCH 16/18] polish --- dbt/adapters/spark/impl.py | 5 +- tests/unit/test_adapter.py | 240 ------------------------------------- 2 files changed, 2 insertions(+), 243 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 7600bc4eb..233c82fc8 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -218,7 +218,7 @@ def get_relation( return self._set_relation_information(cached) if cached else None def parse_describe_extended( - self, relation: Relation, raw_rows: List[agate.Row] + self, relation: Relation, raw_rows: List[agate.Row] ) -> Tuple[Dict[str, any], List[SparkColumn]]: # Convert the Row to a dict dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows] @@ -293,8 +293,7 @@ def _get_updated_relation(self, relation: BaseRelation) -> Optional[SparkRelatio raise e # strip hudi metadata columns. - columns = [x for x in columns - if x.name not in self.HUDI_METADATA_COLUMNS] + columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS] if not metadata: return None diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index 6f0fd0727..8c3ed158b 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -534,243 +534,3 @@ def test_profile_with_cluster_and_sql_endpoint(self): } with self.assertRaises(RuntimeException): config_from_parts_or_dicts(self.project_cfg, profile) - - @pytest.mark.skip() - def test_parse_columns_from_information_with_table_type_and_delta_provider(self): - self.maxDiff = None - rel_type = SparkRelation.get_relation_type.Table - - # Mimics the output of Spark in the information column - information = ( - "Database: default_schema\n" - "Table: mytable\n" - "Owner: root\n" - "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" - "Last Access: Wed May 20 19:25:00 UTC 1925\n" - "Created By: Spark 3.0.1\n" - "Type: MANAGED\n" - "Provider: delta\n" - "Statistics: 123456789 bytes\n" - "Location: /mnt/vo\n" - "Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\n" - "InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat\n" - "OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat\n" - "Partition Provider: Catalog\n" - "Partition Columns: [`dt`]\n" - "Schema: root\n" - " |-- col1: decimal(22,0) (nullable = true)\n" - " |-- col2: string (nullable = true)\n" - " |-- dt: date (nullable = true)\n" - " |-- struct_col: struct (nullable = true)\n" - " | |-- struct_inner_col: string (nullable = true)\n" - ) - relation = SparkRelation.create( - schema='default_schema', - identifier='mytable', - type=rel_type, - information=information - ) - - config = self._get_target_http(self.project_cfg) - columns = SparkAdapter(config).parse_columns_from_information( - relation) - self.assertEqual(len(columns), 4) - self.assertEqual(columns[0].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'col1', - 'column_index': 0, - 'dtype': 'decimal(22,0)', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None, - - 'stats:bytes:description': '', - 'stats:bytes:include': True, - 'stats:bytes:label': 'bytes', - 'stats:bytes:value': 123456789, - }) - - self.assertEqual(columns[3].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'struct_col', - 'column_index': 3, - 'dtype': 'struct', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None, - - 'stats:bytes:description': '', - 'stats:bytes:include': True, - 'stats:bytes:label': 'bytes', - 'stats:bytes:value': 123456789, - }) - - @pytest.mark.skip() - def test_parse_columns_from_information_with_view_type(self): - self.maxDiff = None - rel_type = SparkRelation.get_relation_type.View - information = ( - "Database: default_schema\n" - "Table: myview\n" - "Owner: root\n" - "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" - "Last Access: UNKNOWN\n" - "Created By: Spark 3.0.1\n" - "Type: VIEW\n" - "View Text: WITH base (\n" - " SELECT * FROM source_table\n" - ")\n" - "SELECT col1, col2, dt FROM base\n" - "View Original Text: WITH base (\n" - " SELECT * FROM source_table\n" - ")\n" - "SELECT col1, col2, dt FROM base\n" - "View Catalog and Namespace: spark_catalog.default\n" - "View Query Output Columns: [col1, col2, dt]\n" - "Table Properties: [view.query.out.col.1=col1, view.query.out.col.2=col2, " - "transient_lastDdlTime=1618324324, view.query.out.col.3=dt, " - "view.catalogAndNamespace.part.0=spark_catalog, " - "view.catalogAndNamespace.part.1=default]\n" - "Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\n" - "InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat\n" - "OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat\n" - "Storage Properties: [serialization.format=1]\n" - "Schema: root\n" - " |-- col1: decimal(22,0) (nullable = true)\n" - " |-- col2: string (nullable = true)\n" - " |-- dt: date (nullable = true)\n" - " |-- struct_col: struct (nullable = true)\n" - " | |-- struct_inner_col: string (nullable = true)\n" - ) - relation = SparkRelation.create( - schema='default_schema', - identifier='myview', - type=rel_type, - information=information - ) - - config = self._get_target_http(self.project_cfg) - columns = SparkAdapter(config).parse_columns_from_information( - relation) - self.assertEqual(len(columns), 4) - self.assertEqual(columns[1].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'col2', - 'column_index': 1, - 'dtype': 'string', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None - }) - - self.assertEqual(columns[3].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'struct_col', - 'column_index': 3, - 'dtype': 'struct', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None - }) - - @pytest.mark.skip() - def test_parse_columns_from_information_with_table_type_and_parquet_provider(self): - self.maxDiff = None - rel_type = SparkRelation.get_relation_type.Table - - information = ( - "Database: default_schema\n" - "Table: mytable\n" - "Owner: root\n" - "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" - "Last Access: Wed May 20 19:25:00 UTC 1925\n" - "Created By: Spark 3.0.1\n" - "Type: MANAGED\n" - "Provider: parquet\n" - "Statistics: 1234567890 bytes, 12345678 rows\n" - "Location: /mnt/vo\n" - "Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe\n" - "InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat\n" - "OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat\n" - "Schema: root\n" - " |-- col1: decimal(22,0) (nullable = true)\n" - " |-- col2: string (nullable = true)\n" - " |-- dt: date (nullable = true)\n" - " |-- struct_col: struct (nullable = true)\n" - " | |-- struct_inner_col: string (nullable = true)\n" - ) - relation = SparkRelation.create( - schema='default_schema', - identifier='mytable', - type=rel_type, - information=information - ) - - config = self._get_target_http(self.project_cfg) - columns = SparkAdapter(config).parse_columns_from_information( - relation) - self.assertEqual(len(columns), 4) - self.assertEqual(columns[2].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'dt', - 'column_index': 2, - 'dtype': 'date', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None, - - 'stats:bytes:description': '', - 'stats:bytes:include': True, - 'stats:bytes:label': 'bytes', - 'stats:bytes:value': 1234567890, - - 'stats:rows:description': '', - 'stats:rows:include': True, - 'stats:rows:label': 'rows', - 'stats:rows:value': 12345678 - }) - - self.assertEqual(columns[3].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'struct_col', - 'column_index': 3, - 'dtype': 'struct', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None, - - 'stats:bytes:description': '', - 'stats:bytes:include': True, - 'stats:bytes:label': 'bytes', - 'stats:bytes:value': 1234567890, - - 'stats:rows:description': '', - 'stats:rows:include': True, - 'stats:rows:label': 'rows', - 'stats:rows:value': 12345678 - }) - From d29bd47ef93a77eb27ea59a37588c479abb99f0b Mon Sep 17 00:00:00 2001 From: TalkWithKeyboard <610347922@qq.com> Date: Sat, 21 May 2022 21:33:51 +0800 Subject: [PATCH 17/18] feat: store SparkColumns in the SparkRelation, add needs_information to get_relation --- dbt/adapters/spark/impl.py | 35 +++++++------------ dbt/adapters/spark/relation.py | 4 ++- .../spark/macros/materializations/table.sql | 2 +- 3 files changed, 16 insertions(+), 25 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 233c82fc8..39d2eb73a 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -209,12 +209,20 @@ def list_relations_without_caching( return relations def get_relation( - self, database: Optional[str], schema: str, identifier: str + self, + database: Optional[str], + schema: str, + identifier: str, + needs_information=False ) -> Optional[BaseRelation]: if not self.Relation.include_policy.database: database = None cached = super().get_relation(database, schema, identifier) + + if not needs_information: + return cached + return self._set_relation_information(cached) if cached else None def parse_describe_extended( @@ -271,7 +279,7 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: else: updated_relation = self._set_relation_information(cached_relation) - return self._get_spark_columns(updated_relation) + return updated_relation.columns def _get_updated_relation(self, relation: BaseRelation) -> Optional[SparkRelation]: metadata = None @@ -308,7 +316,7 @@ def _get_updated_relation(self, relation: BaseRelation) -> Optional[SparkRelatio is_hudi=(provider == 'hudi'), owner=metadata.get(KEY_TABLE_OWNER), stats=metadata.get(KEY_TABLE_STATISTICS), - columns={x.column: x.dtype for x in columns} + columns=columns ) def _set_relation_information(self, relation: SparkRelation) -> SparkRelation: @@ -321,31 +329,12 @@ def _set_relation_information(self, relation: SparkRelation) -> SparkRelation: self.cache.update_relation(updated_relation) return updated_relation - @staticmethod - def _get_spark_columns( - relation: Optional[SparkRelation] - ) -> List[SparkColumn]: - if not relation: - return [] - - return [SparkColumn( - table_database=None, - table_schema=relation.schema, - table_name=relation.name, - table_type=relation.type, - table_owner=relation.owner, - table_stats=relation.stats, - column=name, - column_index=idx, - dtype=dtype - ) for idx, (name, dtype) in enumerate(relation.columns.items())] - def _get_columns_for_catalog( self, relation: SparkRelation ) -> Iterable[Dict[str, Any]]: updated_relation = self._set_relation_information(relation) - for column in self._get_spark_columns(updated_relation): + for column in updated_relation.columns: # convert SparkColumns into catalog dicts as_dict = column.to_column_dict() as_dict["column_name"] = as_dict.pop("column", None) diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index 8c314d124..b20f189e2 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -5,6 +5,8 @@ from dbt.adapters.base.relation import BaseRelation, Policy from dbt.exceptions import RuntimeException +from dbt.adapters.spark.column import SparkColumn + @dataclass class SparkQuotePolicy(Policy): @@ -29,7 +31,7 @@ class SparkRelation(BaseRelation): is_hudi: Optional[bool] = None owner: Optional[str] = None stats: Optional[str] = None - columns: Dict[str, str] = field(default_factory=lambda: {}) + columns: List[SparkColumn] = field(default_factory=lambda: []) def __post_init__(self): if self.database != self.schema and self.database: diff --git a/dbt/include/spark/macros/materializations/table.sql b/dbt/include/spark/macros/materializations/table.sql index 2eeb806fd..d132d0433 100644 --- a/dbt/include/spark/macros/materializations/table.sql +++ b/dbt/include/spark/macros/materializations/table.sql @@ -2,7 +2,7 @@ {%- set identifier = model['alias'] -%} - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} + {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier, needs_information=True) -%} {%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, From d2357b7c51e1ca808c808b82264331a1e3225213 Mon Sep 17 00:00:00 2001 From: TalkWithKeyboard <610347922@qq.com> Date: Tue, 24 May 2022 12:06:15 +0800 Subject: [PATCH 18/18] support dispatch pattern for list marco --- dbt/adapters/spark/impl.py | 4 ++-- dbt/include/spark/macros/adapters.sql | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index c626609ec..ade9e68e5 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -29,8 +29,8 @@ GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME = "spark__get_columns_in_relation_raw" LIST_SCHEMAS_MACRO_NAME = "list_schemas" -LIST_TABLES_MACRO_NAME = "spark__list_tables_without_caching" -LIST_VIEWS_MACRO_NAME = "spark__list_views_without_caching" +LIST_TABLES_MACRO_NAME = "list_tables_without_caching" +LIST_VIEWS_MACRO_NAME = "list_views_without_caching" DROP_RELATION_MACRO_NAME = "drop_relation" FETCH_TBL_PROPERTIES_MACRO_NAME = "fetch_tbl_properties" diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 1dac8b638..5f4f11ff6 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -179,6 +179,10 @@ {{ return(adapter.get_columns_in_relation(relation)) }} {% endmacro %} +{% macro list_tables_without_caching(relation) %} + {{ return(adapter.dispatch('list_tables_without_caching', 'dbt')(relation)) }} +{%- endmacro -%} + {% macro spark__list_tables_without_caching(relation) %} {% call statement('list_tables_without_caching', fetch_result=True) -%} show tables in {{ relation.schema }} @@ -186,6 +190,10 @@ {% do return(load_result('list_tables_without_caching').table) %} {% endmacro %} +{% macro list_views_without_caching(relation) %} + {{ return(adapter.dispatch('list_tables_without_caching', 'dbt')(relation)) }} +{%- endmacro -%} + {% macro spark__list_views_without_caching(relation) %} {% call statement('list_views_without_caching', fetch_result=True) -%} show views in {{ relation.schema }}