diff --git a/.changes/unreleased/Features-20220819-215714.yaml b/.changes/unreleased/Features-20220819-215714.yaml new file mode 100644 index 000000000..d2bb60511 --- /dev/null +++ b/.changes/unreleased/Features-20220819-215714.yaml @@ -0,0 +1,7 @@ +kind: Features +body: Faster caching at run start. **Note:** Requires Spark v3 +time: 2022-08-19T21:57:14.56716+02:00 +custom: + Author: TalkWIthKeyboard + Issue: "228" + PR: "342" diff --git a/dbt/adapters/spark/column.py b/dbt/adapters/spark/column.py index dcf7590e9..3333eab90 100644 --- a/dbt/adapters/spark/column.py +++ b/dbt/adapters/spark/column.py @@ -2,14 +2,14 @@ from typing import Any, Dict, Optional, TypeVar, Union from dbt.adapters.base.column import Column -from dbt.dataclass_schema import dbtClassMixin +from dbt.contracts.relation import FakeAPIObject from hologram import JsonDict Self = TypeVar("Self", bound="SparkColumn") @dataclass -class SparkColumn(dbtClassMixin, Column): +class SparkColumn(FakeAPIObject, Column): table_database: Optional[str] = None table_schema: Optional[str] = None table_name: Optional[str] = None diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 12c42ab98..b58b59bfd 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -4,11 +4,15 @@ import base64 from concurrent.futures import Future from dataclasses import dataclass -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Union, Tuple from typing_extensions import TypeAlias import agate +from dbt.adapters.cache import RelationsCache, _CachedRelation from dbt.contracts.relation import RelationType +from dbt.events.base_types import DebugLevel, Cache +from dbt.adapters.reference_keys import _ReferenceKey, _make_key +from dbt.events.functions import fire_event import dbt import dbt.exceptions @@ -29,12 +33,14 @@ GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME = "get_columns_in_relation_raw" LIST_SCHEMAS_MACRO_NAME = "list_schemas" -LIST_RELATIONS_MACRO_NAME = "list_relations_without_caching" +LIST_TABLES_MACRO_NAME = "list_tables_without_caching" +LIST_VIEWS_MACRO_NAME = "list_views_without_caching" DROP_RELATION_MACRO_NAME = "drop_relation" FETCH_TBL_PROPERTIES_MACRO_NAME = "fetch_tbl_properties" KEY_TABLE_OWNER = "Owner" KEY_TABLE_STATISTICS = "Statistics" +KEY_TABLE_PROVIDER = "Provider" @dataclass @@ -48,6 +54,46 @@ class SparkConfig(AdapterConfig): merge_update_columns: Optional[str] = None +@dataclass +class UpdateRelation(DebugLevel, Cache): + relation: _ReferenceKey + code: str = "E038" + + def message(self) -> str: + return f"Updating relation: {str(self.relation)}" + + +@dataclass +class UpdateMissingRelation(DebugLevel, Cache): + relation: _ReferenceKey + code: str = "E039" + + def message(self) -> str: + return f"updated a nonexistent relationship: {str(self.relation)}" + + +class SparkRelationsCache(RelationsCache): + def _update(self, relation: _CachedRelation): + key = relation.key() + + if key not in self.relations: + fire_event(UpdateMissingRelation(relation=key)) + return + + self.relations[key].inner = relation.inner + + def update_relation(self, relation): + """Update the relation inner to the cache + + : param BaseRelation relation: The underlying relation. + """ + cached = _CachedRelation(relation) + fire_event(UpdateRelation(relation=_make_key(cached))) + + with self.lock: + self._update(cached) + + class SparkAdapter(SQLAdapter): COLUMN_NAMES = ( "table_database", @@ -85,6 +131,10 @@ class SparkAdapter(SQLAdapter): ConnectionManager: TypeAlias = SparkConnectionManager AdapterSpecificConfigs: TypeAlias = SparkConfig + def __init__(self, config): + super().__init__(config) + self.cache = SparkRelationsCache() + @classmethod def date_function(cls) -> str: return "current_timestamp()" @@ -114,23 +164,23 @@ def quote(self, identifier): return "`{}`".format(identifier) def add_schema_to_cache(self, schema) -> str: - """Cache a new schema in dbt. It will show up in `list relations`.""" + """Cache a new schema in dbt. It will show up in list relations""" if schema is None: name = self.nice_connection_name() dbt.exceptions.raise_compiler_error( "Attempted to cache a null schema for {}".format(name) ) - if dbt.flags.USE_CACHE: - self.cache.add_schema(None, schema) + self.cache.add_schema(None, schema) # so jinja doesn't render things return "" def list_relations_without_caching( self, schema_relation: SparkRelation ) -> List[SparkRelation]: - kwargs = {"schema_relation": schema_relation} + kwargs = {"relation": schema_relation} try: - results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) + tables = self.execute_macro(LIST_TABLES_MACRO_NAME, kwargs=kwargs) + views = self.execute_macro(LIST_VIEWS_MACRO_NAME, kwargs=kwargs) except dbt.exceptions.RuntimeException as e: errmsg = getattr(e, "msg", "") if f"Database '{schema_relation}' not found" in errmsg: @@ -141,37 +191,36 @@ def list_relations_without_caching( return [] relations = [] - for row in results: - if len(row) != 4: - raise dbt.exceptions.RuntimeException( - f'Invalid value from "show table extended ...", ' - f"got {len(row)} values, expected 4" - ) - _schema, name, _, information = row - rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table - is_delta = "Provider: delta" in information - is_hudi = "Provider: hudi" in information + view_names = views.columns["viewName"].values() + + for tbl in tables: + rel_type = RelationType("view" if tbl["tableName"] in view_names else "table") + _schema = tbl["namespace"] if "namespace" in tables.column_names else tbl["database"] relation = self.Relation.create( schema=_schema, - identifier=name, + identifier=tbl["tableName"], type=rel_type, - information=information, - is_delta=is_delta, - is_hudi=is_hudi, ) relations.append(relation) return relations - def get_relation(self, database: str, schema: str, identifier: str) -> Optional[BaseRelation]: + def get_relation( + self, database: Optional[str], schema: str, identifier: str, needs_information=False + ) -> Optional[BaseRelation]: if not self.Relation.include_policy.database: database = None # type: ignore - return super().get_relation(database, schema, identifier) + cached = super().get_relation(database, schema, identifier) + + if not needs_information: + return cached + + return self._set_relation_information(cached) if cached else None def parse_describe_extended( self, relation: Relation, raw_rows: List[agate.Row] - ) -> List[SparkColumn]: + ) -> Tuple[Dict[str, Any], List[SparkColumn]]: # Convert the Row to a dict dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows] # Find the separator between the rows and the metadata provided @@ -184,7 +233,7 @@ def parse_describe_extended( raw_table_stats = metadata.get(KEY_TABLE_STATISTICS) table_stats = SparkColumn.convert_table_stats(raw_table_stats) - return [ + return metadata, [ SparkColumn( table_database=None, table_schema=relation.schema, @@ -209,69 +258,83 @@ def find_table_information_separator(rows: List[dict]) -> int: return pos def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: - cached_relations = self.cache.get_relations(relation.database, relation.schema) - cached_relation = next( - ( - cached_relation - for cached_relation in cached_relations - if str(cached_relation) == str(relation) - ), - None, - ) - columns = [] - if cached_relation and cached_relation.information: - columns = self.parse_columns_from_information(cached_relation) - if not columns: - # in open source delta 'show table extended' query output doesnt - # return relation's schema. if columns are empty from cache, - # use get_columns_in_relation spark macro - # which would execute 'describe extended tablename' query - try: - rows: List[agate.Row] = self.execute_macro( - GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation} - ) - columns = self.parse_describe_extended(relation, rows) - except dbt.exceptions.RuntimeException as e: - # spark would throw error when table doesn't exist, where other - # CDW would just return and empty list, normalizing the behavior here - errmsg = getattr(e, "msg", "") - if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg: - pass - else: - raise e + # We shouldn't access columns from the cache, until we've implemented + # proper cache update or invalidation at the column level + # https://github.com/dbt-labs/dbt-spark/issues/431 + + # cached_relations = self.cache.get_relations(relation.database, relation.schema) + # cached_relation = next( + # ( + # cached_relation + # for cached_relation in cached_relations + # if str(cached_relation) == str(relation) + # ), + # None, + # ) + + # For now, just always invalidate the cache + updated_relation, columns = self._get_updated_relation(relation) + if updated_relation: + self.cache.add(updated_relation) + return columns + + def _get_updated_relation( + self, relation: BaseRelation + ) -> Tuple[Optional[SparkRelation], List[SparkColumn]]: + metadata = None + columns: List[SparkColumn] = [] + + try: + rows: List[agate.Row] = self.execute_macro( + GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation} + ) + metadata, columns = self.parse_describe_extended(relation, rows) + except dbt.exceptions.RuntimeException as e: + # spark would throw error when table doesn't exist, where other + # CDW would just return and empty list, normalizing the behavior here + errmsg = getattr(e, "msg", "") + if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg: + pass + else: + raise e # strip hudi metadata columns. columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS] - return columns - def parse_columns_from_information(self, relation: SparkRelation) -> List[SparkColumn]: - owner_match = re.findall(self.INFORMATION_OWNER_REGEX, relation.information) - owner = owner_match[0] if owner_match else None - matches = re.finditer(self.INFORMATION_COLUMNS_REGEX, relation.information) - columns = [] - stats_match = re.findall(self.INFORMATION_STATISTICS_REGEX, relation.information) - raw_table_stats = stats_match[0] if stats_match else None - table_stats = SparkColumn.convert_table_stats(raw_table_stats) - for match_num, match in enumerate(matches): - column_name, column_type, nullable = match.groups() - column = SparkColumn( - table_database=None, - table_schema=relation.schema, - table_name=relation.table, - table_type=relation.type, - column_index=match_num, - table_owner=owner, - column=column_name, - dtype=column_type, - table_stats=table_stats, - ) - columns.append(column) - return columns + if not metadata: + # this is a temporary view, not worth caching -- just return columns + return None, columns + + provider = metadata.get(KEY_TABLE_PROVIDER) + return ( + self.Relation.create( + database=None, + schema=relation.schema, + identifier=relation.identifier, + type=relation.type, + is_delta=(provider == "delta"), + is_hudi=(provider == "hudi"), + owner=metadata.get(KEY_TABLE_OWNER), + stats=metadata.get(KEY_TABLE_STATISTICS), + columns=columns, + ), + columns, + ) + + def _set_relation_information(self, relation: SparkRelation) -> SparkRelation: + """Update the information of the relation, or return it if it already exists.""" + if relation.has_information(): + return relation + + updated_relation, _ = self._get_updated_relation(relation) + + self.cache.update_relation(updated_relation) + return updated_relation def _get_columns_for_catalog(self, relation: SparkRelation) -> Iterable[Dict[str, Any]]: - columns = self.parse_columns_from_information(relation) + updated_relation = self._set_relation_information(relation) - for column in columns: + for column in updated_relation.columns: # convert SparkColumns into catalog dicts as_dict = column.to_column_dict() as_dict["column_name"] = as_dict.pop("column", None) diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index 249caf0d7..41f478bac 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -1,10 +1,12 @@ -from typing import Optional +from typing import Optional, List -from dataclasses import dataclass +from dataclasses import dataclass, field from dbt.adapters.base.relation import BaseRelation, Policy from dbt.exceptions import RuntimeException +from dbt.adapters.spark.column import SparkColumn + @dataclass class SparkQuotePolicy(Policy): @@ -27,7 +29,9 @@ class SparkRelation(BaseRelation): quote_character: str = "`" is_delta: Optional[bool] = None is_hudi: Optional[bool] = None - information: Optional[str] = None + owner: Optional[str] = None + stats: Optional[str] = None + columns: List[SparkColumn] = field(default_factory=lambda: []) def __post_init__(self): if self.database != self.schema and self.database: @@ -40,3 +44,6 @@ def render(self): "include, but only one can be set" ) return super().render() + + def has_information(self) -> bool: + return self.owner is not None and self.stats is not None and len(self.columns) > 0 diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 05630ede5..3a3e5cd8d 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -194,12 +194,26 @@ {{ return(adapter.get_columns_in_relation(relation)) }} {% endmacro %} -{% macro spark__list_relations_without_caching(relation) %} - {% call statement('list_relations_without_caching', fetch_result=True) -%} - show table extended in {{ relation }} like '*' +{% macro list_tables_without_caching(relation) %} + {{ return(adapter.dispatch('list_tables_without_caching', 'dbt')(relation)) }} +{%- endmacro -%} + +{% macro spark__list_tables_without_caching(relation) %} + {% call statement('list_tables_without_caching', fetch_result=True) -%} + show tables in {{ relation.schema }} {% endcall %} + {% do return(load_result('list_tables_without_caching').table) %} +{% endmacro %} - {% do return(load_result('list_relations_without_caching').table) %} +{% macro list_views_without_caching(relation) %} + {{ return(adapter.dispatch('list_views_without_caching', 'dbt')(relation)) }} +{%- endmacro -%} + +{% macro spark__list_views_without_caching(relation) %} + {% call statement('list_views_without_caching', fetch_result=True) -%} + show views in {{ relation.schema }} + {% endcall %} + {% do return(load_result('list_views_without_caching').table) %} {% endmacro %} {% macro spark__list_schemas(database) -%} diff --git a/dbt/include/spark/macros/materializations/snapshot.sql b/dbt/include/spark/macros/materializations/snapshot.sql index 6cf2358fe..174c12b65 100644 --- a/dbt/include/spark/macros/materializations/snapshot.sql +++ b/dbt/include/spark/macros/materializations/snapshot.sql @@ -77,11 +77,18 @@ {%- set file_format = config.get('file_format', 'parquet') -%} {%- set grant_config = config.get('grants') -%} - {% set target_relation_exists, target_relation = get_or_create_relation( - database=none, - schema=model.schema, - identifier=target_table, - type='table') -%} + {%- set existing_relation = adapter.get_relation(database=database, schema=schema, identifier=target_table, needs_information=True) -%} + {%- if existing_relation -%} + {%- set target_relation_exists, target_relation = True, existing_relation -%} + {%- else -%} + {%- set target_relation_exists = False -%} + {%- set target_relation = api.Relation.create( + database=none, + schema=model.schema, + identifier=target_table, + type='table' + ) -%} + {%- endif -%} {%- if file_format not in ['delta', 'hudi'] -%} {% set invalid_format_msg -%} @@ -100,10 +107,6 @@ {% endif %} {% endif %} - {% if not adapter.check_schema_exists(model.database, model.schema) %} - {% do create_schema(model.database, model.schema) %} - {% endif %} - {%- if not target_relation.is_table -%} {% do exceptions.relation_wrong_type(target_relation, 'table') %} {%- endif -%} diff --git a/dbt/include/spark/macros/materializations/table.sql b/dbt/include/spark/macros/materializations/table.sql index 6a02ea164..97bf03d3d 100644 --- a/dbt/include/spark/macros/materializations/table.sql +++ b/dbt/include/spark/macros/materializations/table.sql @@ -3,7 +3,7 @@ {%- set identifier = model['alias'] -%} {%- set grant_config = config.get('grants') -%} - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} + {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier, needs_information=True) -%} {%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, diff --git a/tests/functional/adapter/test_basic.py b/tests/functional/adapter/test_basic.py index bdccf169d..406fbcd93 100644 --- a/tests/functional/adapter/test_basic.py +++ b/tests/functional/adapter/test_basic.py @@ -23,9 +23,6 @@ class TestSingularTestsSpark(BaseSingularTests): pass -# The local cluster currently tests on spark 2.x, which does not support this -# if we upgrade it to 3.x, we can enable this test -@pytest.mark.skip_profile('apache_spark') class TestSingularTestsEphemeralSpark(BaseSingularTestsEphemeral): pass @@ -48,8 +45,7 @@ class TestGenericTestsSpark(BaseGenericTests): pass -# These tests were not enabled in the dbtspec files, so skipping here. -# Error encountered was: Error running query: java.lang.ClassNotFoundException: delta.DefaultSource +# Snapshots require Delta @pytest.mark.skip_profile('apache_spark', 'spark_session') class TestSnapshotCheckColsSpark(BaseSnapshotCheckCols): @pytest.fixture(scope="class") @@ -64,8 +60,7 @@ def project_config_update(self): } -# These tests were not enabled in the dbtspec files, so skipping here. -# Error encountered was: Error running query: java.lang.ClassNotFoundException: delta.DefaultSource +# Snapshots require Delta @pytest.mark.skip_profile('apache_spark', 'spark_session') class TestSnapshotTimestampSpark(BaseSnapshotTimestamp): @pytest.fixture(scope="class") diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index f87a89b2b..8c3ed158b 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -2,6 +2,7 @@ from unittest import mock import dbt.flags as flags +import pytest from dbt.exceptions import RuntimeException from agate import Row from pyhive import hive @@ -298,8 +299,27 @@ def test_parse_relation(self): for r in plain_rows] config = self._get_target_http(self.project_cfg) - rows = SparkAdapter(config).parse_describe_extended( + metadata, rows = SparkAdapter(config).parse_describe_extended( relation, input_cols) + + self.assertDictEqual(metadata, { + '# col_name': 'data_type', + 'dt': 'date', + None: None, + '# Detailed Table Information': None, + 'Database': None, + 'Owner': 'root', + 'Created Time': 'Wed Feb 04 18:15:00 UTC 1815', + 'Last Access': 'Wed May 20 19:25:00 UTC 1925', + 'Type': 'MANAGED', + 'Provider': 'delta', + 'Location': '/mnt/vo', + 'Serde Library': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', + 'InputFormat': 'org.apache.hadoop.mapred.SequenceFileInputFormat', + 'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat', + 'Partition Provider': 'Catalog' + }) + self.assertEqual(len(rows), 4) self.assertEqual(rows[0].to_column_dict(omit_none=False), { 'table_database': None, @@ -379,7 +399,7 @@ def test_parse_relation_with_integer_owner(self): for r in plain_rows] config = self._get_target_http(self.project_cfg) - rows = SparkAdapter(config).parse_describe_extended( + _, rows = SparkAdapter(config).parse_describe_extended( relation, input_cols) self.assertEqual(rows[0].to_column_dict().get('table_owner'), '1234') @@ -419,8 +439,26 @@ def test_parse_relation_with_statistics(self): for r in plain_rows] config = self._get_target_http(self.project_cfg) - rows = SparkAdapter(config).parse_describe_extended( + metadata, rows = SparkAdapter(config).parse_describe_extended( relation, input_cols) + + self.assertEqual(metadata, { + None: None, + '# Detailed Table Information': None, + 'Database': None, + 'Owner': 'root', + 'Created Time': 'Wed Feb 04 18:15:00 UTC 1815', + 'Last Access': 'Wed May 20 19:25:00 UTC 1925', + 'Statistics': '1109049927 bytes, 14093476 rows', + 'Type': 'MANAGED', + 'Provider': 'delta', + 'Location': '/mnt/vo', + 'Serde Library': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', + 'InputFormat': 'org.apache.hadoop.mapred.SequenceFileInputFormat', + 'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat', + 'Partition Provider': 'Catalog' + }) + self.assertEqual(len(rows), 1) self.assertEqual(rows[0].to_column_dict(omit_none=False), { 'table_database': None, @@ -496,240 +534,3 @@ def test_profile_with_cluster_and_sql_endpoint(self): } with self.assertRaises(RuntimeException): config_from_parts_or_dicts(self.project_cfg, profile) - - def test_parse_columns_from_information_with_table_type_and_delta_provider(self): - self.maxDiff = None - rel_type = SparkRelation.get_relation_type.Table - - # Mimics the output of Spark in the information column - information = ( - "Database: default_schema\n" - "Table: mytable\n" - "Owner: root\n" - "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" - "Last Access: Wed May 20 19:25:00 UTC 1925\n" - "Created By: Spark 3.0.1\n" - "Type: MANAGED\n" - "Provider: delta\n" - "Statistics: 123456789 bytes\n" - "Location: /mnt/vo\n" - "Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\n" - "InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat\n" - "OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat\n" - "Partition Provider: Catalog\n" - "Partition Columns: [`dt`]\n" - "Schema: root\n" - " |-- col1: decimal(22,0) (nullable = true)\n" - " |-- col2: string (nullable = true)\n" - " |-- dt: date (nullable = true)\n" - " |-- struct_col: struct (nullable = true)\n" - " | |-- struct_inner_col: string (nullable = true)\n" - ) - relation = SparkRelation.create( - schema='default_schema', - identifier='mytable', - type=rel_type, - information=information - ) - - config = self._get_target_http(self.project_cfg) - columns = SparkAdapter(config).parse_columns_from_information( - relation) - self.assertEqual(len(columns), 4) - self.assertEqual(columns[0].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'col1', - 'column_index': 0, - 'dtype': 'decimal(22,0)', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None, - - 'stats:bytes:description': '', - 'stats:bytes:include': True, - 'stats:bytes:label': 'bytes', - 'stats:bytes:value': 123456789, - }) - - self.assertEqual(columns[3].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'struct_col', - 'column_index': 3, - 'dtype': 'struct', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None, - - 'stats:bytes:description': '', - 'stats:bytes:include': True, - 'stats:bytes:label': 'bytes', - 'stats:bytes:value': 123456789, - }) - - def test_parse_columns_from_information_with_view_type(self): - self.maxDiff = None - rel_type = SparkRelation.get_relation_type.View - information = ( - "Database: default_schema\n" - "Table: myview\n" - "Owner: root\n" - "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" - "Last Access: UNKNOWN\n" - "Created By: Spark 3.0.1\n" - "Type: VIEW\n" - "View Text: WITH base (\n" - " SELECT * FROM source_table\n" - ")\n" - "SELECT col1, col2, dt FROM base\n" - "View Original Text: WITH base (\n" - " SELECT * FROM source_table\n" - ")\n" - "SELECT col1, col2, dt FROM base\n" - "View Catalog and Namespace: spark_catalog.default\n" - "View Query Output Columns: [col1, col2, dt]\n" - "Table Properties: [view.query.out.col.1=col1, view.query.out.col.2=col2, " - "transient_lastDdlTime=1618324324, view.query.out.col.3=dt, " - "view.catalogAndNamespace.part.0=spark_catalog, " - "view.catalogAndNamespace.part.1=default]\n" - "Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\n" - "InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat\n" - "OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat\n" - "Storage Properties: [serialization.format=1]\n" - "Schema: root\n" - " |-- col1: decimal(22,0) (nullable = true)\n" - " |-- col2: string (nullable = true)\n" - " |-- dt: date (nullable = true)\n" - " |-- struct_col: struct (nullable = true)\n" - " | |-- struct_inner_col: string (nullable = true)\n" - ) - relation = SparkRelation.create( - schema='default_schema', - identifier='myview', - type=rel_type, - information=information - ) - - config = self._get_target_http(self.project_cfg) - columns = SparkAdapter(config).parse_columns_from_information( - relation) - self.assertEqual(len(columns), 4) - self.assertEqual(columns[1].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'col2', - 'column_index': 1, - 'dtype': 'string', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None - }) - - self.assertEqual(columns[3].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'struct_col', - 'column_index': 3, - 'dtype': 'struct', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None - }) - - def test_parse_columns_from_information_with_table_type_and_parquet_provider(self): - self.maxDiff = None - rel_type = SparkRelation.get_relation_type.Table - - information = ( - "Database: default_schema\n" - "Table: mytable\n" - "Owner: root\n" - "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" - "Last Access: Wed May 20 19:25:00 UTC 1925\n" - "Created By: Spark 3.0.1\n" - "Type: MANAGED\n" - "Provider: parquet\n" - "Statistics: 1234567890 bytes, 12345678 rows\n" - "Location: /mnt/vo\n" - "Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe\n" - "InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat\n" - "OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat\n" - "Schema: root\n" - " |-- col1: decimal(22,0) (nullable = true)\n" - " |-- col2: string (nullable = true)\n" - " |-- dt: date (nullable = true)\n" - " |-- struct_col: struct (nullable = true)\n" - " | |-- struct_inner_col: string (nullable = true)\n" - ) - relation = SparkRelation.create( - schema='default_schema', - identifier='mytable', - type=rel_type, - information=information - ) - - config = self._get_target_http(self.project_cfg) - columns = SparkAdapter(config).parse_columns_from_information( - relation) - self.assertEqual(len(columns), 4) - self.assertEqual(columns[2].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'dt', - 'column_index': 2, - 'dtype': 'date', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None, - - 'stats:bytes:description': '', - 'stats:bytes:include': True, - 'stats:bytes:label': 'bytes', - 'stats:bytes:value': 1234567890, - - 'stats:rows:description': '', - 'stats:rows:include': True, - 'stats:rows:label': 'rows', - 'stats:rows:value': 12345678 - }) - - self.assertEqual(columns[3].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'struct_col', - 'column_index': 3, - 'dtype': 'struct', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None, - - 'stats:bytes:description': '', - 'stats:bytes:include': True, - 'stats:bytes:label': 'bytes', - 'stats:bytes:value': 1234567890, - - 'stats:rows:description': '', - 'stats:rows:include': True, - 'stats:rows:label': 'rows', - 'stats:rows:value': 12345678 - }) -