diff --git a/dbt/adapters/spark/column.py b/dbt/adapters/spark/column.py index 4df6b301b..83e90e371 100644 --- a/dbt/adapters/spark/column.py +++ b/dbt/adapters/spark/column.py @@ -2,14 +2,14 @@ from typing import Any, Dict, Optional, TypeVar, Union from dbt.adapters.base.column import Column -from dbt.dataclass_schema import dbtClassMixin +from dbt.contracts.relation import FakeAPIObject from hologram import JsonDict Self = TypeVar("Self", bound="SparkColumn") @dataclass -class SparkColumn(dbtClassMixin, Column): +class SparkColumn(FakeAPIObject, Column): table_database: Optional[str] = None table_schema: Optional[str] = None table_name: Optional[str] = None diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 4f7b9d4cc..ade9e68e5 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -1,11 +1,15 @@ import re from concurrent.futures import Future from dataclasses import dataclass -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Union, Tuple from typing_extensions import TypeAlias import agate +from dbt.adapters.cache import RelationsCache, _CachedRelation from dbt.contracts.relation import RelationType +from dbt.events.base_types import DebugLevel, Cache +from dbt.adapters.reference_keys import _ReferenceKey, _make_key +from dbt.events.functions import fire_event import dbt import dbt.exceptions @@ -25,12 +29,14 @@ GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME = "spark__get_columns_in_relation_raw" LIST_SCHEMAS_MACRO_NAME = "list_schemas" -LIST_RELATIONS_MACRO_NAME = "list_relations_without_caching" +LIST_TABLES_MACRO_NAME = "list_tables_without_caching" +LIST_VIEWS_MACRO_NAME = "list_views_without_caching" DROP_RELATION_MACRO_NAME = "drop_relation" FETCH_TBL_PROPERTIES_MACRO_NAME = "fetch_tbl_properties" KEY_TABLE_OWNER = "Owner" KEY_TABLE_STATISTICS = "Statistics" +KEY_TABLE_PROVIDER = "Provider" @dataclass @@ -44,6 +50,46 @@ class SparkConfig(AdapterConfig): merge_update_columns: Optional[str] = None +@dataclass +class UpdateRelation(DebugLevel, Cache): + relation: _ReferenceKey + code: str = "E038" + + def message(self) -> str: + return f"Updating relation: {str(self.relation)}" + + +@dataclass +class UpdateMissingRelation(DebugLevel, Cache): + relation: _ReferenceKey + code: str = "E039" + + def message(self) -> str: + return f"updated a nonexistent relationship: {str(self.relation)}" + + +class SparkRelationsCache(RelationsCache): + def _update(self, relation: _CachedRelation): + key = relation.key() + + if key not in self.relations: + fire_event(UpdateMissingRelation(relation=key)) + return + + self.relations[key].inner = relation.inner + + def update_relation(self, relation): + """Update the relation inner to the cache + + : param BaseRelation relation: The underlying relation. + """ + cached = _CachedRelation(relation) + fire_event(UpdateRelation(relation=_make_key(cached))) + + with self.lock: + self._update(cached) + + class SparkAdapter(SQLAdapter): COLUMN_NAMES = ( "table_database", @@ -81,6 +127,10 @@ class SparkAdapter(SQLAdapter): ConnectionManager: TypeAlias = SparkConnectionManager AdapterSpecificConfigs: TypeAlias = SparkConfig + def __init__(self, config): + super().__init__(config) + self.cache = SparkRelationsCache() + @classmethod def date_function(cls) -> str: return "current_timestamp()" @@ -124,9 +174,16 @@ def add_schema_to_cache(self, schema) -> str: def list_relations_without_caching( self, schema_relation: SparkRelation ) -> List[SparkRelation]: - kwargs = {"schema_relation": schema_relation} + kwargs = {"relation": schema_relation} try: - results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) + tables = self.execute_macro( + LIST_TABLES_MACRO_NAME, + kwargs=kwargs + ) + views = self.execute_macro( + LIST_VIEWS_MACRO_NAME, + kwargs=kwargs + ) except dbt.exceptions.RuntimeException as e: errmsg = getattr(e, "msg", "") if f"Database '{schema_relation}' not found" in errmsg: @@ -137,39 +194,40 @@ def list_relations_without_caching( return [] relations = [] - for row in results: - if len(row) != 4: - raise dbt.exceptions.RuntimeException( - f'Invalid value from "show table extended ...", ' - f"got {len(row)} values, expected 4" - ) - _schema, name, _, information = row - rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table - is_delta = "Provider: delta" in information - is_hudi = "Provider: hudi" in information + view_names = views.columns["viewName"].values() + + for tbl in tables: + rel_type = RelationType("view" if tbl["tableName"] in view_names else "table") + _schema = tbl["namespace"] if "namespace" in tables.column_names else tbl["database"] relation = self.Relation.create( schema=_schema, - identifier=name, + identifier=tbl["tableName"], type=rel_type, - information=information, - is_delta=is_delta, - is_hudi=is_hudi, ) relations.append(relation) return relations def get_relation( - self, database: Optional[str], schema: str, identifier: str + self, + database: Optional[str], + schema: str, + identifier: str, + needs_information=False ) -> Optional[BaseRelation]: if not self.Relation.include_policy.database: database = None - return super().get_relation(database, schema, identifier) + cached = super().get_relation(database, schema, identifier) + + if not needs_information: + return cached + + return self._set_relation_information(cached) if cached else None def parse_describe_extended( self, relation: Relation, raw_rows: List[agate.Row] - ) -> List[SparkColumn]: + ) -> Tuple[Dict[str, any], List[SparkColumn]]: # Convert the Row to a dict dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows] # Find the separator between the rows and the metadata provided @@ -182,20 +240,17 @@ def parse_describe_extended( raw_table_stats = metadata.get(KEY_TABLE_STATISTICS) table_stats = SparkColumn.convert_table_stats(raw_table_stats) - return [ - SparkColumn( - table_database=None, - table_schema=relation.schema, - table_name=relation.name, - table_type=relation.type, - table_owner=str(metadata.get(KEY_TABLE_OWNER)), - table_stats=table_stats, - column=column["col_name"], - column_index=idx, - dtype=column["data_type"], - ) - for idx, column in enumerate(rows) - ] + return metadata, [SparkColumn( + table_database=None, + table_schema=relation.schema, + table_name=relation.name, + table_type=relation.type, + table_owner=str(metadata.get(KEY_TABLE_OWNER)), + table_stats=table_stats, + column=column['col_name'], + column_index=idx, + dtype=column['data_type'], + ) for idx, column in enumerate(rows)] @staticmethod def find_table_information_separator(rows: List[dict]) -> int: @@ -216,60 +271,72 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: ), None, ) + + if not cached_relation: + updated_relation = self._get_updated_relation(relation) + if updated_relation: + self.cache.add(updated_relation) + else: + updated_relation = self._set_relation_information(cached_relation) + + return updated_relation.columns if updated_relation is not None else [] + + def _get_updated_relation(self, relation: BaseRelation) -> Optional[SparkRelation]: + metadata = None columns = [] - if cached_relation and cached_relation.information: - columns = self.parse_columns_from_information(cached_relation) - if not columns: - # in open source delta 'show table extended' query output doesnt - # return relation's schema. if columns are empty from cache, - # use get_columns_in_relation spark macro - # which would execute 'describe extended tablename' query - try: - rows: List[agate.Row] = self.execute_macro( + + try: + rows: List[agate.Row] = self.execute_macro( GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation} ) - columns = self.parse_describe_extended(relation, rows) - except dbt.exceptions.RuntimeException as e: - # spark would throw error when table doesn't exist, where other - # CDW would just return and empty list, normalizing the behavior here - errmsg = getattr(e, "msg", "") - if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg: - pass - else: - raise e + metadata, columns = self.parse_describe_extended(relation, rows) + except dbt.exceptions.RuntimeException as e: + # spark would throw error when table doesn't exist, where other + # CDW would just return and empty list, normalizing the behavior here + errmsg = getattr(e, "msg", "") + if ( + "Table or view not found" in errmsg or + "NoSuchTableException" in errmsg + ): + pass + else: + raise e # strip hudi metadata columns. columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS] - return columns - def parse_columns_from_information(self, relation: SparkRelation) -> List[SparkColumn]: - owner_match = re.findall(self.INFORMATION_OWNER_REGEX, relation.information) - owner = owner_match[0] if owner_match else None - matches = re.finditer(self.INFORMATION_COLUMNS_REGEX, relation.information) - columns = [] - stats_match = re.findall(self.INFORMATION_STATISTICS_REGEX, relation.information) - raw_table_stats = stats_match[0] if stats_match else None - table_stats = SparkColumn.convert_table_stats(raw_table_stats) - for match_num, match in enumerate(matches): - column_name, column_type, nullable = match.groups() - column = SparkColumn( - table_database=None, - table_schema=relation.schema, - table_name=relation.table, - table_type=relation.type, - column_index=match_num, - table_owner=owner, - column=column_name, - dtype=column_type, - table_stats=table_stats, - ) - columns.append(column) - return columns + if not metadata: + return None + + provider = metadata.get(KEY_TABLE_PROVIDER) + return self.Relation.create( + database=None, + schema=relation.schema, + identifier=relation.identifier, + type=relation.type, + is_delta=(provider == 'delta'), + is_hudi=(provider == 'hudi'), + owner=metadata.get(KEY_TABLE_OWNER), + stats=metadata.get(KEY_TABLE_STATISTICS), + columns=columns + ) + + def _set_relation_information(self, relation: SparkRelation) -> SparkRelation: + """Update the information of the relation, or return it if it already exists.""" + if relation.has_information(): + return relation + + updated_relation = self._get_updated_relation(relation) + + self.cache.update_relation(updated_relation) + return updated_relation - def _get_columns_for_catalog(self, relation: SparkRelation) -> Iterable[Dict[str, Any]]: - columns = self.parse_columns_from_information(relation) + def _get_columns_for_catalog( + self, relation: SparkRelation + ) -> Iterable[Dict[str, Any]]: + updated_relation = self._set_relation_information(relation) - for column in columns: + for column in (updated_relation.columns if updated_relation is not None else []): # convert SparkColumns into catalog dicts as_dict = column.to_column_dict() as_dict["column_name"] = as_dict.pop("column", None) diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index 249caf0d7..b20f189e2 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -1,10 +1,12 @@ -from typing import Optional +from typing import Optional, List, Dict, Hashable -from dataclasses import dataclass +from dataclasses import dataclass, field from dbt.adapters.base.relation import BaseRelation, Policy from dbt.exceptions import RuntimeException +from dbt.adapters.spark.column import SparkColumn + @dataclass class SparkQuotePolicy(Policy): @@ -27,7 +29,9 @@ class SparkRelation(BaseRelation): quote_character: str = "`" is_delta: Optional[bool] = None is_hudi: Optional[bool] = None - information: Optional[str] = None + owner: Optional[str] = None + stats: Optional[str] = None + columns: List[SparkColumn] = field(default_factory=lambda: []) def __post_init__(self): if self.database != self.schema and self.database: @@ -40,3 +44,8 @@ def render(self): "include, but only one can be set" ) return super().render() + + def has_information(self) -> bool: + return self.owner is not None and \ + self.stats is not None and \ + len(self.columns) > 0 diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 5322597ff..5f4f11ff6 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -179,12 +179,26 @@ {{ return(adapter.get_columns_in_relation(relation)) }} {% endmacro %} -{% macro spark__list_relations_without_caching(relation) %} - {% call statement('list_relations_without_caching', fetch_result=True) -%} - show table extended in {{ relation }} like '*' +{% macro list_tables_without_caching(relation) %} + {{ return(adapter.dispatch('list_tables_without_caching', 'dbt')(relation)) }} +{%- endmacro -%} + +{% macro spark__list_tables_without_caching(relation) %} + {% call statement('list_tables_without_caching', fetch_result=True) -%} + show tables in {{ relation.schema }} {% endcall %} + {% do return(load_result('list_tables_without_caching').table) %} +{% endmacro %} - {% do return(load_result('list_relations_without_caching').table) %} +{% macro list_views_without_caching(relation) %} + {{ return(adapter.dispatch('list_tables_without_caching', 'dbt')(relation)) }} +{%- endmacro -%} + +{% macro spark__list_views_without_caching(relation) %} + {% call statement('list_views_without_caching', fetch_result=True) -%} + show views in {{ relation.schema }} + {% endcall %} + {% do return(load_result('list_views_without_caching').table) %} {% endmacro %} {% macro spark__list_schemas(database) -%} diff --git a/dbt/include/spark/macros/materializations/table.sql b/dbt/include/spark/macros/materializations/table.sql index 2eeb806fd..d132d0433 100644 --- a/dbt/include/spark/macros/materializations/table.sql +++ b/dbt/include/spark/macros/materializations/table.sql @@ -2,7 +2,7 @@ {%- set identifier = model['alias'] -%} - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} + {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier, needs_information=True) -%} {%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index f87a89b2b..8c3ed158b 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -2,6 +2,7 @@ from unittest import mock import dbt.flags as flags +import pytest from dbt.exceptions import RuntimeException from agate import Row from pyhive import hive @@ -298,8 +299,27 @@ def test_parse_relation(self): for r in plain_rows] config = self._get_target_http(self.project_cfg) - rows = SparkAdapter(config).parse_describe_extended( + metadata, rows = SparkAdapter(config).parse_describe_extended( relation, input_cols) + + self.assertDictEqual(metadata, { + '# col_name': 'data_type', + 'dt': 'date', + None: None, + '# Detailed Table Information': None, + 'Database': None, + 'Owner': 'root', + 'Created Time': 'Wed Feb 04 18:15:00 UTC 1815', + 'Last Access': 'Wed May 20 19:25:00 UTC 1925', + 'Type': 'MANAGED', + 'Provider': 'delta', + 'Location': '/mnt/vo', + 'Serde Library': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', + 'InputFormat': 'org.apache.hadoop.mapred.SequenceFileInputFormat', + 'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat', + 'Partition Provider': 'Catalog' + }) + self.assertEqual(len(rows), 4) self.assertEqual(rows[0].to_column_dict(omit_none=False), { 'table_database': None, @@ -379,7 +399,7 @@ def test_parse_relation_with_integer_owner(self): for r in plain_rows] config = self._get_target_http(self.project_cfg) - rows = SparkAdapter(config).parse_describe_extended( + _, rows = SparkAdapter(config).parse_describe_extended( relation, input_cols) self.assertEqual(rows[0].to_column_dict().get('table_owner'), '1234') @@ -419,8 +439,26 @@ def test_parse_relation_with_statistics(self): for r in plain_rows] config = self._get_target_http(self.project_cfg) - rows = SparkAdapter(config).parse_describe_extended( + metadata, rows = SparkAdapter(config).parse_describe_extended( relation, input_cols) + + self.assertEqual(metadata, { + None: None, + '# Detailed Table Information': None, + 'Database': None, + 'Owner': 'root', + 'Created Time': 'Wed Feb 04 18:15:00 UTC 1815', + 'Last Access': 'Wed May 20 19:25:00 UTC 1925', + 'Statistics': '1109049927 bytes, 14093476 rows', + 'Type': 'MANAGED', + 'Provider': 'delta', + 'Location': '/mnt/vo', + 'Serde Library': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe', + 'InputFormat': 'org.apache.hadoop.mapred.SequenceFileInputFormat', + 'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat', + 'Partition Provider': 'Catalog' + }) + self.assertEqual(len(rows), 1) self.assertEqual(rows[0].to_column_dict(omit_none=False), { 'table_database': None, @@ -496,240 +534,3 @@ def test_profile_with_cluster_and_sql_endpoint(self): } with self.assertRaises(RuntimeException): config_from_parts_or_dicts(self.project_cfg, profile) - - def test_parse_columns_from_information_with_table_type_and_delta_provider(self): - self.maxDiff = None - rel_type = SparkRelation.get_relation_type.Table - - # Mimics the output of Spark in the information column - information = ( - "Database: default_schema\n" - "Table: mytable\n" - "Owner: root\n" - "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" - "Last Access: Wed May 20 19:25:00 UTC 1925\n" - "Created By: Spark 3.0.1\n" - "Type: MANAGED\n" - "Provider: delta\n" - "Statistics: 123456789 bytes\n" - "Location: /mnt/vo\n" - "Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\n" - "InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat\n" - "OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat\n" - "Partition Provider: Catalog\n" - "Partition Columns: [`dt`]\n" - "Schema: root\n" - " |-- col1: decimal(22,0) (nullable = true)\n" - " |-- col2: string (nullable = true)\n" - " |-- dt: date (nullable = true)\n" - " |-- struct_col: struct (nullable = true)\n" - " | |-- struct_inner_col: string (nullable = true)\n" - ) - relation = SparkRelation.create( - schema='default_schema', - identifier='mytable', - type=rel_type, - information=information - ) - - config = self._get_target_http(self.project_cfg) - columns = SparkAdapter(config).parse_columns_from_information( - relation) - self.assertEqual(len(columns), 4) - self.assertEqual(columns[0].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'col1', - 'column_index': 0, - 'dtype': 'decimal(22,0)', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None, - - 'stats:bytes:description': '', - 'stats:bytes:include': True, - 'stats:bytes:label': 'bytes', - 'stats:bytes:value': 123456789, - }) - - self.assertEqual(columns[3].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'struct_col', - 'column_index': 3, - 'dtype': 'struct', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None, - - 'stats:bytes:description': '', - 'stats:bytes:include': True, - 'stats:bytes:label': 'bytes', - 'stats:bytes:value': 123456789, - }) - - def test_parse_columns_from_information_with_view_type(self): - self.maxDiff = None - rel_type = SparkRelation.get_relation_type.View - information = ( - "Database: default_schema\n" - "Table: myview\n" - "Owner: root\n" - "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" - "Last Access: UNKNOWN\n" - "Created By: Spark 3.0.1\n" - "Type: VIEW\n" - "View Text: WITH base (\n" - " SELECT * FROM source_table\n" - ")\n" - "SELECT col1, col2, dt FROM base\n" - "View Original Text: WITH base (\n" - " SELECT * FROM source_table\n" - ")\n" - "SELECT col1, col2, dt FROM base\n" - "View Catalog and Namespace: spark_catalog.default\n" - "View Query Output Columns: [col1, col2, dt]\n" - "Table Properties: [view.query.out.col.1=col1, view.query.out.col.2=col2, " - "transient_lastDdlTime=1618324324, view.query.out.col.3=dt, " - "view.catalogAndNamespace.part.0=spark_catalog, " - "view.catalogAndNamespace.part.1=default]\n" - "Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\n" - "InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat\n" - "OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat\n" - "Storage Properties: [serialization.format=1]\n" - "Schema: root\n" - " |-- col1: decimal(22,0) (nullable = true)\n" - " |-- col2: string (nullable = true)\n" - " |-- dt: date (nullable = true)\n" - " |-- struct_col: struct (nullable = true)\n" - " | |-- struct_inner_col: string (nullable = true)\n" - ) - relation = SparkRelation.create( - schema='default_schema', - identifier='myview', - type=rel_type, - information=information - ) - - config = self._get_target_http(self.project_cfg) - columns = SparkAdapter(config).parse_columns_from_information( - relation) - self.assertEqual(len(columns), 4) - self.assertEqual(columns[1].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'col2', - 'column_index': 1, - 'dtype': 'string', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None - }) - - self.assertEqual(columns[3].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'struct_col', - 'column_index': 3, - 'dtype': 'struct', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None - }) - - def test_parse_columns_from_information_with_table_type_and_parquet_provider(self): - self.maxDiff = None - rel_type = SparkRelation.get_relation_type.Table - - information = ( - "Database: default_schema\n" - "Table: mytable\n" - "Owner: root\n" - "Created Time: Wed Feb 04 18:15:00 UTC 1815\n" - "Last Access: Wed May 20 19:25:00 UTC 1925\n" - "Created By: Spark 3.0.1\n" - "Type: MANAGED\n" - "Provider: parquet\n" - "Statistics: 1234567890 bytes, 12345678 rows\n" - "Location: /mnt/vo\n" - "Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe\n" - "InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat\n" - "OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat\n" - "Schema: root\n" - " |-- col1: decimal(22,0) (nullable = true)\n" - " |-- col2: string (nullable = true)\n" - " |-- dt: date (nullable = true)\n" - " |-- struct_col: struct (nullable = true)\n" - " | |-- struct_inner_col: string (nullable = true)\n" - ) - relation = SparkRelation.create( - schema='default_schema', - identifier='mytable', - type=rel_type, - information=information - ) - - config = self._get_target_http(self.project_cfg) - columns = SparkAdapter(config).parse_columns_from_information( - relation) - self.assertEqual(len(columns), 4) - self.assertEqual(columns[2].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'dt', - 'column_index': 2, - 'dtype': 'date', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None, - - 'stats:bytes:description': '', - 'stats:bytes:include': True, - 'stats:bytes:label': 'bytes', - 'stats:bytes:value': 1234567890, - - 'stats:rows:description': '', - 'stats:rows:include': True, - 'stats:rows:label': 'rows', - 'stats:rows:value': 12345678 - }) - - self.assertEqual(columns[3].to_column_dict(omit_none=False), { - 'table_database': None, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_owner': 'root', - 'column': 'struct_col', - 'column_index': 3, - 'dtype': 'struct', - 'numeric_scale': None, - 'numeric_precision': None, - 'char_size': None, - - 'stats:bytes:description': '', - 'stats:bytes:include': True, - 'stats:bytes:label': 'bytes', - 'stats:bytes:value': 1234567890, - - 'stats:rows:description': '', - 'stats:rows:include': True, - 'stats:rows:label': 'rows', - 'stats:rows:value': 12345678 - }) -