Skip to content

Commit

Permalink
polish
Browse files Browse the repository at this point in the history
  • Loading branch information
TalkWIthKeyboard committed May 15, 2022
1 parent 345239b commit 101c6dd
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 243 deletions.
5 changes: 2 additions & 3 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def get_relation(
return self._set_relation_information(cached) if cached else None

def parse_describe_extended(
self, relation: Relation, raw_rows: List[agate.Row]
self, relation: Relation, raw_rows: List[agate.Row]
) -> Tuple[Dict[str, any], List[SparkColumn]]:
# Convert the Row to a dict
dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows]
Expand Down Expand Up @@ -293,8 +293,7 @@ def _get_updated_relation(self, relation: BaseRelation) -> Optional[SparkRelatio
raise e

# strip hudi metadata columns.
columns = [x for x in columns
if x.name not in self.HUDI_METADATA_COLUMNS]
columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS]

if not metadata:
return None
Expand Down
240 changes: 0 additions & 240 deletions tests/unit/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,243 +534,3 @@ def test_profile_with_cluster_and_sql_endpoint(self):
}
with self.assertRaises(RuntimeException):
config_from_parts_or_dicts(self.project_cfg, profile)

@pytest.mark.skip()
def test_parse_columns_from_information_with_table_type_and_delta_provider(self):
self.maxDiff = None
rel_type = SparkRelation.get_relation_type.Table

# Mimics the output of Spark in the information column
information = (
"Database: default_schema\n"
"Table: mytable\n"
"Owner: root\n"
"Created Time: Wed Feb 04 18:15:00 UTC 1815\n"
"Last Access: Wed May 20 19:25:00 UTC 1925\n"
"Created By: Spark 3.0.1\n"
"Type: MANAGED\n"
"Provider: delta\n"
"Statistics: 123456789 bytes\n"
"Location: /mnt/vo\n"
"Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\n"
"InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat\n"
"OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat\n"
"Partition Provider: Catalog\n"
"Partition Columns: [`dt`]\n"
"Schema: root\n"
" |-- col1: decimal(22,0) (nullable = true)\n"
" |-- col2: string (nullable = true)\n"
" |-- dt: date (nullable = true)\n"
" |-- struct_col: struct (nullable = true)\n"
" | |-- struct_inner_col: string (nullable = true)\n"
)
relation = SparkRelation.create(
schema='default_schema',
identifier='mytable',
type=rel_type,
information=information
)

config = self._get_target_http(self.project_cfg)
columns = SparkAdapter(config).parse_columns_from_information(
relation)
self.assertEqual(len(columns), 4)
self.assertEqual(columns[0].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
'table_owner': 'root',
'column': 'col1',
'column_index': 0,
'dtype': 'decimal(22,0)',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None,

'stats:bytes:description': '',
'stats:bytes:include': True,
'stats:bytes:label': 'bytes',
'stats:bytes:value': 123456789,
})

self.assertEqual(columns[3].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
'table_owner': 'root',
'column': 'struct_col',
'column_index': 3,
'dtype': 'struct',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None,

'stats:bytes:description': '',
'stats:bytes:include': True,
'stats:bytes:label': 'bytes',
'stats:bytes:value': 123456789,
})

@pytest.mark.skip()
def test_parse_columns_from_information_with_view_type(self):
self.maxDiff = None
rel_type = SparkRelation.get_relation_type.View
information = (
"Database: default_schema\n"
"Table: myview\n"
"Owner: root\n"
"Created Time: Wed Feb 04 18:15:00 UTC 1815\n"
"Last Access: UNKNOWN\n"
"Created By: Spark 3.0.1\n"
"Type: VIEW\n"
"View Text: WITH base (\n"
" SELECT * FROM source_table\n"
")\n"
"SELECT col1, col2, dt FROM base\n"
"View Original Text: WITH base (\n"
" SELECT * FROM source_table\n"
")\n"
"SELECT col1, col2, dt FROM base\n"
"View Catalog and Namespace: spark_catalog.default\n"
"View Query Output Columns: [col1, col2, dt]\n"
"Table Properties: [view.query.out.col.1=col1, view.query.out.col.2=col2, "
"transient_lastDdlTime=1618324324, view.query.out.col.3=dt, "
"view.catalogAndNamespace.part.0=spark_catalog, "
"view.catalogAndNamespace.part.1=default]\n"
"Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\n"
"InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat\n"
"OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat\n"
"Storage Properties: [serialization.format=1]\n"
"Schema: root\n"
" |-- col1: decimal(22,0) (nullable = true)\n"
" |-- col2: string (nullable = true)\n"
" |-- dt: date (nullable = true)\n"
" |-- struct_col: struct (nullable = true)\n"
" | |-- struct_inner_col: string (nullable = true)\n"
)
relation = SparkRelation.create(
schema='default_schema',
identifier='myview',
type=rel_type,
information=information
)

config = self._get_target_http(self.project_cfg)
columns = SparkAdapter(config).parse_columns_from_information(
relation)
self.assertEqual(len(columns), 4)
self.assertEqual(columns[1].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
'table_owner': 'root',
'column': 'col2',
'column_index': 1,
'dtype': 'string',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None
})

self.assertEqual(columns[3].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
'table_owner': 'root',
'column': 'struct_col',
'column_index': 3,
'dtype': 'struct',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None
})

@pytest.mark.skip()
def test_parse_columns_from_information_with_table_type_and_parquet_provider(self):
self.maxDiff = None
rel_type = SparkRelation.get_relation_type.Table

information = (
"Database: default_schema\n"
"Table: mytable\n"
"Owner: root\n"
"Created Time: Wed Feb 04 18:15:00 UTC 1815\n"
"Last Access: Wed May 20 19:25:00 UTC 1925\n"
"Created By: Spark 3.0.1\n"
"Type: MANAGED\n"
"Provider: parquet\n"
"Statistics: 1234567890 bytes, 12345678 rows\n"
"Location: /mnt/vo\n"
"Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe\n"
"InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat\n"
"OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat\n"
"Schema: root\n"
" |-- col1: decimal(22,0) (nullable = true)\n"
" |-- col2: string (nullable = true)\n"
" |-- dt: date (nullable = true)\n"
" |-- struct_col: struct (nullable = true)\n"
" | |-- struct_inner_col: string (nullable = true)\n"
)
relation = SparkRelation.create(
schema='default_schema',
identifier='mytable',
type=rel_type,
information=information
)

config = self._get_target_http(self.project_cfg)
columns = SparkAdapter(config).parse_columns_from_information(
relation)
self.assertEqual(len(columns), 4)
self.assertEqual(columns[2].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
'table_owner': 'root',
'column': 'dt',
'column_index': 2,
'dtype': 'date',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None,

'stats:bytes:description': '',
'stats:bytes:include': True,
'stats:bytes:label': 'bytes',
'stats:bytes:value': 1234567890,

'stats:rows:description': '',
'stats:rows:include': True,
'stats:rows:label': 'rows',
'stats:rows:value': 12345678
})

self.assertEqual(columns[3].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
'table_owner': 'root',
'column': 'struct_col',
'column_index': 3,
'dtype': 'struct',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None,

'stats:bytes:description': '',
'stats:bytes:include': True,
'stats:bytes:label': 'bytes',
'stats:bytes:value': 1234567890,

'stats:rows:description': '',
'stats:rows:include': True,
'stats:rows:label': 'rows',
'stats:rows:value': 12345678
})

0 comments on commit 101c6dd

Please sign in to comment.