diff --git a/dbt/adapters/databricks/connections.py b/dbt/adapters/databricks/connections.py index f4a6ee68c..316a2260f 100644 --- a/dbt/adapters/databricks/connections.py +++ b/dbt/adapters/databricks/connections.py @@ -357,6 +357,11 @@ def description( def schemas(self, catalog_name: str, schema_name: Optional[str] = None) -> None: self._cursor.schemas(catalog_name=catalog_name, schema_name=schema_name) + def tables(self, catalog_name: str, schema_name: str, table_name: Optional[str] = None) -> None: + self._cursor.tables( + catalog_name=catalog_name, schema_name=schema_name, table_name=table_name + ) + def __del__(self) -> None: if self._cursor.open: # This should not happen. The cursor should explicitly be closed. @@ -526,6 +531,14 @@ def list_schemas(self, database: str, schema: Optional[str] = None) -> Table: lambda cursor: cursor.schemas(catalog_name=database, schema_name=schema), ) + def list_tables(self, database: str, schema: str, identifier: Optional[str] = None) -> Table: + return self._execute_cursor( + f"GetTables(database={database}, schema={schema}, identifier={identifier})", + lambda cursor: cursor.tables( + catalog_name=database, schema_name=schema, table_name=identifier + ), + ) + @classmethod def open(cls, connection: Connection) -> Connection: if connection.state == ConnectionState.OPEN: diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index b2fce460b..b4bb13bea 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -1,10 +1,9 @@ from concurrent.futures import Future from contextlib import contextmanager from dataclasses import dataclass -import re -from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Type, Union, cast +from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Type, Union -from agate import Row, Table +from agate import Row, Table, Text from dbt.adapters.base import AdapterConfig, PythonJobHelper from dbt.adapters.base.impl import catch_as_completed @@ -17,7 +16,9 @@ KEY_TABLE_STATISTICS, LIST_RELATIONS_MACRO_NAME, LIST_SCHEMAS_MACRO_NAME, + TABLE_OR_VIEW_NOT_FOUND_MESSAGES, ) +from dbt.clients.agate_helper import empty_table from dbt.contracts.connection import AdapterResponse, Connection from dbt.contracts.graph.manifest import Manifest from dbt.contracts.relation import RelationType @@ -40,6 +41,9 @@ CURRENT_CATALOG_MACRO_NAME = "current_catalog" USE_CATALOG_MACRO_NAME = "use_catalog" +SHOW_TABLES_MACRO_NAME = "show_tables" +SHOW_VIEWS_MACRO_NAME = "show_views" + @dataclass class DatabricksConfig(AdapterConfig): @@ -113,9 +117,7 @@ def list_relations_without_caching( # type: ignore[override] ) -> List[DatabricksRelation]: kwargs = {"schema_relation": schema_relation} try: - # The catalog for `show table extended` needs to match the current catalog. - with self._catalog(schema_relation.database): - results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) + results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) except dbt.exceptions.RuntimeException as e: errmsg = getattr(e, "msg", "") if f"Database '{schema_relation}' not found" in errmsg: @@ -125,35 +127,79 @@ def list_relations_without_caching( # type: ignore[override] logger.debug(f"{description} {schema_relation}: {e.msg}") return [] - relations = [] - for row in results: - if len(row) != 4: - raise dbt.exceptions.RuntimeException( - f'Invalid value from "show table extended ...", ' - f"got {len(row)} values, expected 4" - ) - _schema, name, _, information = row - rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table - is_delta = "Provider: delta" in information - is_hudi = "Provider: hudi" in information - relation = self.Relation.create( - database=schema_relation.database, - # Use `_schema` retrieved from the cluster to avoid mismatched case - # between the profile and the cluster. - schema=_schema, + return [ + self.Relation.create( + database=database, + schema=schema, identifier=name, - type=rel_type, - information=information, - is_delta=is_delta, - is_hudi=is_hudi, + type=self.Relation.get_relation_type(kind), + ) + for database, schema, name, kind in results.select( + ["database_name", "schema_name", "name", "kind"] + ) + ] + + @available.parse(lambda *a, **k: empty_table()) + def get_relations_without_caching(self, relation: DatabricksRelation) -> Table: + kwargs = {"relation": relation} + + new_rows: List[Tuple] + if relation.database is not None: + assert relation.schema is not None + tables = self.connections.list_tables( + database=relation.database, schema=relation.schema ) - relations.append(relation) + new_rows = [ + (row["TABLE_CAT"], row["TABLE_SCHEM"], row["TABLE_NAME"], row["TABLE_TYPE"].lower()) + for row in tables + ] + else: + tables = self.execute_macro(SHOW_TABLES_MACRO_NAME, kwargs=kwargs) + new_rows = [ + (relation.database, row["database"], row["tableName"], "") for row in tables + ] + + if any(not row[3] for row in new_rows): + with self._catalog(relation.database): + views = self.execute_macro(SHOW_VIEWS_MACRO_NAME, kwargs=kwargs) + + view_names = set(views.columns["viewName"].values()) + new_rows = [ + ( + row[0], + row[1], + row[2], + str(RelationType.View if row[2] in view_names else RelationType.Table), + ) + for row in new_rows + ] + + return Table( + new_rows, + column_names=["database_name", "schema_name", "name", "kind"], + column_types=[Text(), Text(), Text(), Text()], + ) + + def get_relation( + self, + database: Optional[str], + schema: str, + identifier: str, + *, + needs_information: bool = False, + ) -> Optional[DatabricksRelation]: + cached: Optional[DatabricksRelation] = super(SparkAdapter, self).get_relation( + database=database, schema=schema, identifier=identifier + ) - return relations + if not needs_information: + return cached + + return self._set_relation_information(cached) if cached else None def parse_describe_extended( # type: ignore[override] self, relation: DatabricksRelation, raw_rows: List[Row] - ) -> List[DatabricksColumn]: + ) -> Tuple[Dict[str, Any], List[DatabricksColumn]]: # Convert the Row to a dict dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows] # Find the separator between the rows and the metadata provided @@ -166,7 +212,7 @@ def parse_describe_extended( # type: ignore[override] raw_table_stats = metadata.get(KEY_TABLE_STATISTICS) table_stats = DatabricksColumn.convert_table_stats(raw_table_stats) - return [ + return metadata, [ DatabricksColumn( table_database=relation.database, table_schema=relation.schema, @@ -184,56 +230,47 @@ def parse_describe_extended( # type: ignore[override] def get_columns_in_relation( # type: ignore[override] self, relation: DatabricksRelation ) -> List[DatabricksColumn]: - columns = [] + return self._get_updated_relation(relation)[1] + + def _get_updated_relation( + self, relation: DatabricksRelation + ) -> Tuple[DatabricksRelation, List[DatabricksColumn]]: try: - rows: List[Row] = self.execute_macro( + rows = self.execute_macro( GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation} ) - columns = self.parse_describe_extended(relation, rows) + metadata, columns = self.parse_describe_extended(relation, rows) except dbt.exceptions.RuntimeException as e: # spark would throw error when table doesn't exist, where other # CDW would just return and empty list, normalizing the behavior here errmsg = getattr(e, "msg", "") - if any( - msg in errmsg - for msg in ( - "[TABLE_OR_VIEW_NOT_FOUND]", - "Table or view not found", - "NoSuchTableException", - ) - ): - pass + found_msgs = (msg in errmsg for msg in TABLE_OR_VIEW_NOT_FOUND_MESSAGES) + if any(found_msgs): + metadata = None + columns = [] else: raise e # strip hudi metadata columns. - return [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS] + columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS] + + return ( + self.Relation.create( + database=relation.database, + schema=relation.schema, + identifier=relation.identifier, + type=relation.type, + metadata=metadata, + ), + columns, + ) - def parse_columns_from_information( # type: ignore[override] - self, relation: DatabricksRelation - ) -> List[DatabricksColumn]: - owner_match = re.findall(self.INFORMATION_OWNER_REGEX, cast(str, relation.information)) - owner = owner_match[0] if owner_match else None - matches = re.finditer(self.INFORMATION_COLUMNS_REGEX, cast(str, relation.information)) - columns = [] - stats_match = re.findall(self.INFORMATION_STATISTICS_REGEX, cast(str, relation.information)) - raw_table_stats = stats_match[0] if stats_match else None - table_stats = DatabricksColumn.convert_table_stats(raw_table_stats) - for match_num, match in enumerate(matches): - column_name, column_type, nullable = match.groups() - column = DatabricksColumn( - table_database=relation.database, - table_schema=relation.schema, - table_name=relation.table, - table_type=relation.type, - column_index=(match_num + 1), - table_owner=owner, - column=column_name, - dtype=column_type, - table_stats=table_stats, - ) - columns.append(column) - return columns + def _set_relation_information(self, relation: DatabricksRelation) -> DatabricksRelation: + """Update the information of the relation, or return it if it already exists.""" + if relation.has_information(): + return relation + + return self._get_updated_relation(relation)[0] def get_catalog(self, manifest: Manifest) -> Tuple[Table, List[Exception]]: schema_map = self._get_catalog_schemas(manifest) @@ -253,7 +290,7 @@ def get_catalog(self, manifest: Manifest) -> Tuple[Table, List[Exception]]: def _get_columns_for_catalog( # type: ignore[override] self, relation: DatabricksRelation ) -> Iterable[Dict[str, Any]]: - columns = self.parse_columns_from_information(relation) + columns = self.get_columns_in_relation(relation) for column in columns: # convert DatabricksRelation into catalog dicts diff --git a/dbt/adapters/databricks/relation.py b/dbt/adapters/databricks/relation.py index e0c966fe7..ef93dcd1b 100644 --- a/dbt/adapters/databricks/relation.py +++ b/dbt/adapters/databricks/relation.py @@ -1,12 +1,22 @@ from dataclasses import dataclass -from typing import Any, Dict +from typing import Any, Dict, Optional -from dbt.adapters.base.relation import Policy -from dbt.adapters.spark.relation import SparkRelation +from dbt.adapters.base.relation import BaseRelation, Policy +from dbt.adapters.spark.impl import KEY_TABLE_OWNER, KEY_TABLE_STATISTICS from dbt.adapters.databricks.utils import remove_undefined +KEY_TABLE_PROVIDER = "Provider" + + +@dataclass +class DatabricksQuotePolicy(Policy): + database: bool = False + schema: bool = False + identifier: bool = False + + @dataclass class DatabricksIncludePolicy(Policy): database: bool = True @@ -15,8 +25,12 @@ class DatabricksIncludePolicy(Policy): @dataclass(frozen=True, eq=False, repr=False) -class DatabricksRelation(SparkRelation): - include_policy: DatabricksIncludePolicy = DatabricksIncludePolicy() # type: ignore[assignment] +class DatabricksRelation(BaseRelation): + quote_policy = DatabricksQuotePolicy() + include_policy = DatabricksIncludePolicy() + quote_character: str = "`" + + metadata: Optional[Dict[str, Any]] = None @classmethod def __pre_deserialize__(cls, data: Dict[Any, Any]) -> Dict[Any, Any]: @@ -27,8 +41,23 @@ def __pre_deserialize__(cls, data: Dict[Any, Any]) -> Dict[Any, Any]: data["path"]["database"] = remove_undefined(data["path"]["database"]) return data - def __post_init__(self) -> None: - return + def has_information(self) -> bool: + return self.metadata is not None + + @property + def is_delta(self) -> bool: + assert self.metadata is not None + return self.metadata.get(KEY_TABLE_PROVIDER) == "delta" + + @property + def is_hudi(self) -> bool: + assert self.metadata is not None + return self.metadata.get(KEY_TABLE_PROVIDER) == "hudi" + + @property + def owner(self) -> Optional[str]: + return self.metadata.get(KEY_TABLE_OWNER) if self.metadata is not None else None - def render(self) -> str: - return super(SparkRelation, self).render() + @property + def stats(self) -> Optional[str]: + return self.metadata.get(KEY_TABLE_STATISTICS) if self.metadata is not None else None diff --git a/dbt/include/databricks/macros/adapters.sql b/dbt/include/databricks/macros/adapters.sql index ccdf60f69..392c84e89 100644 --- a/dbt/include/databricks/macros/adapters.sql +++ b/dbt/include/databricks/macros/adapters.sql @@ -155,6 +155,34 @@ {% endfor %} {% endmacro %} +{% macro databricks__list_relations_without_caching(schema_relation) %} + {{ return(adapter.get_relations_without_caching(schema_relation)) }} +{% endmacro %} + +{% macro show_tables(relation) %} + {{ return(adapter.dispatch('show_tables', 'dbt')(relation)) }} +{% endmacro %} + +{% macro databricks__show_tables(relation) %} + {% call statement('show_tables', fetch_result=True) -%} + show tables in {{ relation }} + {% endcall %} + + {% do return(load_result('show_tables').table) %} +{% endmacro %} + +{% macro show_views(relation) %} + {{ return(adapter.dispatch('show_views', 'dbt')(relation)) }} +{% endmacro %} + +{% macro databricks__show_views(relation) %} + {% call statement('show_views', fetch_result=True) -%} + show views in {{ relation }} + {% endcall %} + + {% do return(load_result('show_views').table) %} +{% endmacro %} + {% macro databricks__generate_database_name(custom_database_name=none, node=none) -%} {%- set default_database = target.database -%} {%- if custom_database_name is none -%} @@ -177,3 +205,23 @@ {%- endif -%} {% do return(tmp_relation) %} {% endmacro %} + +{% macro databricks__get_or_create_relation(database, schema, identifier, type, needs_information=False) %} + {%- set target_relation = adapter.get_relation( + database=database, + schema=schema, + identifier=identifier, + needs_information=needs_information) %} + + {% if target_relation %} + {% do return([true, target_relation]) %} + {% endif %} + + {%- set new_relation = api.Relation.create( + database=database, + schema=schema, + identifier=identifier, + type=type + ) -%} + {% do return([false, new_relation]) %} +{% endmacro %} diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index d153158f7..ba6e4cfd5 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -14,7 +14,7 @@ {%- set language = model['language'] -%} {%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%} {%- set target_relation = this -%} - {%- set existing_relation = load_relation(this) -%} + {%- set existing_relation = adapter.get_relation(database=this.database, schema=this.schema, identifier=this.identifier, needs_information=True) -%} {#-- Set Overwrite Mode --#} {%- if incremental_strategy == 'insert_overwrite' and partition_by -%} diff --git a/dbt/include/databricks/macros/materializations/snapshot.sql b/dbt/include/databricks/macros/materializations/snapshot.sql index 112f953b1..fd1ba9836 100644 --- a/dbt/include/databricks/macros/materializations/snapshot.sql +++ b/dbt/include/databricks/macros/materializations/snapshot.sql @@ -27,11 +27,12 @@ {%- set file_format = config.get('file_format', 'delta') -%} {%- set grant_config = config.get('grants') -%} - {% set target_relation_exists, target_relation = get_or_create_relation( + {% set target_relation_exists, target_relation = databricks__get_or_create_relation( database=model.database, schema=model.schema, identifier=target_table, - type='table') -%} + type='table', + needs_information=True) -%} {%- if file_format not in ['delta', 'hudi'] -%} {% set invalid_format_msg -%} diff --git a/dbt/include/databricks/macros/materializations/table.sql b/dbt/include/databricks/macros/materializations/table.sql index e3eba9e2e..a8929dbb6 100644 --- a/dbt/include/databricks/macros/materializations/table.sql +++ b/dbt/include/databricks/macros/materializations/table.sql @@ -3,7 +3,7 @@ {%- set identifier = model['alias'] -%} {%- set grant_config = config.get('grants') -%} - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} + {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier, needs_information=True) -%} {%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, diff --git a/tests/functional/adapter/test_basic.py b/tests/functional/adapter/test_basic.py index 8678845a4..f27a119ee 100644 --- a/tests/functional/adapter/test_basic.py +++ b/tests/functional/adapter/test_basic.py @@ -69,7 +69,7 @@ def expected_catalog(self, project): return base_expected_catalog( project, role=AnyString(), - id_type="long", + id_type="bigint", text_type="string", time_type="timestamp", view_type="view", @@ -84,10 +84,10 @@ def expected_catalog(self, project): return expected_references_catalog( project, role=AnyString(), - id_type="long", + id_type="bigint", text_type="string", time_type="timestamp", - bigint_type="long", + bigint_type="bigint", view_type="view", table_type="table", model_stats=_StatsLikeDict(), diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index 3e2fd7d61..6707bb3f6 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -422,7 +422,29 @@ def test_parse_relation(self): input_cols = [Row(keys=["col_name", "data_type"], values=r) for r in plain_rows] config = self._get_target_databricks_sql_connector(self.project_cfg) - rows = DatabricksAdapter(config).parse_describe_extended(relation, input_cols) + metadata, rows = DatabricksAdapter(config).parse_describe_extended(relation, input_cols) + + self.assertDictEqual( + metadata, + { + "# col_name": "data_type", + "dt": "date", + None: None, + "# Detailed Table Information": None, + "Database": None, + "Owner": "root", + "Created Time": "Wed Feb 04 18:15:00 UTC 1815", + "Last Access": "Wed May 20 19:25:00 UTC 1925", + "Type": "MANAGED", + "Provider": "delta", + "Location": "/mnt/vo", + "Serde Library": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + "InputFormat": "org.apache.hadoop.mapred.SequenceFileInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat", + "Partition Provider": "Catalog", + }, + ) + self.assertEqual(len(rows), 4) self.assertEqual( rows[0].to_column_dict(omit_none=False), @@ -511,7 +533,7 @@ def test_parse_relation_with_integer_owner(self): input_cols = [Row(keys=["col_name", "data_type"], values=r) for r in plain_rows] config = self._get_target_databricks_sql_connector(self.project_cfg) - rows = DatabricksAdapter(config).parse_describe_extended(relation, input_cols) + _, rows = DatabricksAdapter(config).parse_describe_extended(relation, input_cols) self.assertEqual(rows[0].to_column_dict().get("table_owner"), "1234") @@ -547,7 +569,28 @@ def test_parse_relation_with_statistics(self): input_cols = [Row(keys=["col_name", "data_type"], values=r) for r in plain_rows] config = self._get_target_databricks_sql_connector(self.project_cfg) - rows = DatabricksAdapter(config).parse_describe_extended(relation, input_cols) + metadata, rows = DatabricksAdapter(config).parse_describe_extended(relation, input_cols) + + self.assertEqual( + metadata, + { + None: None, + "# Detailed Table Information": None, + "Database": None, + "Owner": "root", + "Created Time": "Wed Feb 04 18:15:00 UTC 1815", + "Last Access": "Wed May 20 19:25:00 UTC 1925", + "Statistics": "1109049927 bytes, 14093476 rows", + "Type": "MANAGED", + "Provider": "delta", + "Location": "/mnt/vo", + "Serde Library": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + "InputFormat": "org.apache.hadoop.mapred.SequenceFileInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat", + "Partition Provider": "Catalog", + }, + ) + self.assertEqual(len(rows), 1) self.assertEqual( rows[0].to_column_dict(omit_none=False), @@ -581,239 +624,3 @@ def test_relation_with_database(self): assert r1.database is None r2 = adapter.Relation.create(database="something", schema="different", identifier="table") assert r2.database == "something" - - def test_parse_columns_from_information_with_table_type_and_delta_provider(self): - self.maxDiff = None - rel_type = DatabricksRelation.get_relation_type.Table - - # Mimics the output of Spark in the information column - information = ( - "Database: default_schema\n" - "Table: mytable\n" - "Owner: root\n" - "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" - "Last Access: Wed May 20 19:25:00 UTC 1925\n" - "Created By: Spark 3.0.1\n" - "Type: MANAGED\n" - "Provider: delta\n" - "Statistics: 123456789 bytes\n" - "Location: /mnt/vo\n" - "Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\n" - "InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat\n" - "OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat\n" - "Partition Provider: Catalog\n" - "Partition Columns: [`dt`]\n" - "Schema: root\n" - " |-- col1: decimal(22,0) (nullable = true)\n" - " |-- col2: string (nullable = true)\n" - " |-- dt: date (nullable = true)\n" - " |-- struct_col: struct (nullable = true)\n" - " | |-- struct_inner_col: string (nullable = true)\n" - ) - relation = DatabricksRelation.create( - schema="default_schema", identifier="mytable", type=rel_type, information=information - ) - - config = self._get_target_databricks_sql_connector(self.project_cfg) - columns = DatabricksAdapter(config).parse_columns_from_information(relation) - self.assertEqual(len(columns), 4) - self.assertEqual( - columns[0].to_column_dict(omit_none=False), - { - "table_database": None, - "table_schema": relation.schema, - "table_name": relation.name, - "table_type": rel_type, - "table_owner": "root", - "column": "col1", - "column_index": 1, - "dtype": "decimal(22,0)", - "numeric_scale": None, - "numeric_precision": None, - "char_size": None, - "stats:bytes:description": "", - "stats:bytes:include": True, - "stats:bytes:label": "bytes", - "stats:bytes:value": 123456789, - }, - ) - - self.assertEqual( - columns[3].to_column_dict(omit_none=False), - { - "table_database": None, - "table_schema": relation.schema, - "table_name": relation.name, - "table_type": rel_type, - "table_owner": "root", - "column": "struct_col", - "column_index": 4, - "dtype": "struct", - "numeric_scale": None, - "numeric_precision": None, - "char_size": None, - "stats:bytes:description": "", - "stats:bytes:include": True, - "stats:bytes:label": "bytes", - "stats:bytes:value": 123456789, - }, - ) - - def test_parse_columns_from_information_with_view_type(self): - self.maxDiff = None - rel_type = DatabricksRelation.get_relation_type.View - information = ( - "Database: default_schema\n" - "Table: myview\n" - "Owner: root\n" - "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" - "Last Access: UNKNOWN\n" - "Created By: Spark 3.0.1\n" - "Type: VIEW\n" - "View Text: WITH base (\n" - " SELECT * FROM source_table\n" - ")\n" - "SELECT col1, col2, dt FROM base\n" - "View Original Text: WITH base (\n" - " SELECT * FROM source_table\n" - ")\n" - "SELECT col1, col2, dt FROM base\n" - "View Catalog and Namespace: spark_catalog.default\n" - "View Query Output Columns: [col1, col2, dt]\n" - "Table Properties: [view.query.out.col.1=col1, view.query.out.col.2=col2, " - "transient_lastDdlTime=1618324324, view.query.out.col.3=dt, " - "view.catalogAndNamespace.part.0=spark_catalog, " - "view.catalogAndNamespace.part.1=default]\n" - "Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\n" - "InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat\n" - "OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat\n" - "Storage Properties: [serialization.format=1]\n" - "Schema: root\n" - " |-- col1: decimal(22,0) (nullable = true)\n" - " |-- col2: string (nullable = true)\n" - " |-- dt: date (nullable = true)\n" - " |-- struct_col: struct (nullable = true)\n" - " | |-- struct_inner_col: string (nullable = true)\n" - ) - relation = DatabricksRelation.create( - schema="default_schema", identifier="myview", type=rel_type, information=information - ) - - config = self._get_target_databricks_sql_connector(self.project_cfg) - columns = DatabricksAdapter(config).parse_columns_from_information(relation) - self.assertEqual(len(columns), 4) - self.assertEqual( - columns[1].to_column_dict(omit_none=False), - { - "table_database": None, - "table_schema": relation.schema, - "table_name": relation.name, - "table_type": rel_type, - "table_owner": "root", - "column": "col2", - "column_index": 2, - "dtype": "string", - "numeric_scale": None, - "numeric_precision": None, - "char_size": None, - }, - ) - - self.assertEqual( - columns[3].to_column_dict(omit_none=False), - { - "table_database": None, - "table_schema": relation.schema, - "table_name": relation.name, - "table_type": rel_type, - "table_owner": "root", - "column": "struct_col", - "column_index": 4, - "dtype": "struct", - "numeric_scale": None, - "numeric_precision": None, - "char_size": None, - }, - ) - - def test_parse_columns_from_information_with_table_type_and_parquet_provider(self): - self.maxDiff = None - rel_type = DatabricksRelation.get_relation_type.Table - - information = ( - "Database: default_schema\n" - "Table: mytable\n" - "Owner: root\n" - "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" - "Last Access: Wed May 20 19:25:00 UTC 1925\n" - "Created By: Spark 3.0.1\n" - "Type: MANAGED\n" - "Provider: parquet\n" - "Statistics: 1234567890 bytes, 12345678 rows\n" - "Location: /mnt/vo\n" - "Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe\n" - "InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat\n" - "OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat\n" - "Schema: root\n" - " |-- col1: decimal(22,0) (nullable = true)\n" - " |-- col2: string (nullable = true)\n" - " |-- dt: date (nullable = true)\n" - " |-- struct_col: struct (nullable = true)\n" - " | |-- struct_inner_col: string (nullable = true)\n" - ) - relation = DatabricksRelation.create( - schema="default_schema", identifier="mytable", type=rel_type, information=information - ) - - config = self._get_target_databricks_sql_connector(self.project_cfg) - columns = DatabricksAdapter(config).parse_columns_from_information(relation) - self.assertEqual(len(columns), 4) - self.assertEqual( - columns[2].to_column_dict(omit_none=False), - { - "table_database": None, - "table_schema": relation.schema, - "table_name": relation.name, - "table_type": rel_type, - "table_owner": "root", - "column": "dt", - "column_index": 3, - "dtype": "date", - "numeric_scale": None, - "numeric_precision": None, - "char_size": None, - "stats:bytes:description": "", - "stats:bytes:include": True, - "stats:bytes:label": "bytes", - "stats:bytes:value": 1234567890, - "stats:rows:description": "", - "stats:rows:include": True, - "stats:rows:label": "rows", - "stats:rows:value": 12345678, - }, - ) - - self.assertEqual( - columns[3].to_column_dict(omit_none=False), - { - "table_database": None, - "table_schema": relation.schema, - "table_name": relation.name, - "table_type": rel_type, - "table_owner": "root", - "column": "struct_col", - "column_index": 4, - "dtype": "struct", - "numeric_scale": None, - "numeric_precision": None, - "char_size": None, - "stats:bytes:description": "", - "stats:bytes:include": True, - "stats:bytes:label": "bytes", - "stats:bytes:value": 1234567890, - "stats:rows:description": "", - "stats:rows:include": True, - "stats:rows:label": "rows", - "stats:rows:value": 12345678, - }, - )