-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use show table extended with table name list for get_catalog. #237
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,22 @@ | ||
from dataclasses import dataclass | ||
from typing import ClassVar, Dict | ||
|
||
from dbt.adapters.spark.column import SparkColumn | ||
|
||
|
||
@dataclass | ||
class DatabricksColumn(SparkColumn): | ||
TYPE_LABELS: ClassVar[Dict[str, str]] = { | ||
"LONG": "BIGINT", | ||
} | ||
|
||
@classmethod | ||
def translate_type(cls, dtype: str) -> str: | ||
return super(SparkColumn, cls).translate_type(dtype).lower() | ||
|
||
@property | ||
def data_type(self) -> str: | ||
return self.translate_type(self.dtype) | ||
|
||
def __repr__(self) -> str: | ||
return "<DatabricksColumn {} ({})>".format(self.name, self.data_type) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,16 @@ | ||
from concurrent.futures import Future | ||
from contextlib import contextmanager | ||
from itertools import chain | ||
from dataclasses import dataclass | ||
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Type, Union | ||
import re | ||
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Type, Union | ||
|
||
from agate import Row, Table, Text | ||
|
||
from dbt.adapters.base import AdapterConfig, PythonJobHelper | ||
from dbt.adapters.base.impl import catch_as_completed | ||
from dbt.adapters.base.meta import available | ||
from dbt.adapters.base.relation import BaseRelation | ||
from dbt.adapters.base.relation import BaseRelation, InformationSchema | ||
from dbt.adapters.spark.impl import ( | ||
SparkAdapter, | ||
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, | ||
|
@@ -18,9 +20,10 @@ | |
LIST_SCHEMAS_MACRO_NAME, | ||
TABLE_OR_VIEW_NOT_FOUND_MESSAGES, | ||
) | ||
from dbt.clients.agate_helper import empty_table | ||
from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER, empty_table | ||
from dbt.contracts.connection import AdapterResponse, Connection | ||
from dbt.contracts.graph.manifest import Manifest | ||
from dbt.contracts.graph.nodes import ResultNode | ||
from dbt.contracts.relation import RelationType | ||
import dbt.exceptions | ||
from dbt.events import AdapterLogger | ||
|
@@ -41,6 +44,7 @@ | |
CURRENT_CATALOG_MACRO_NAME = "current_catalog" | ||
USE_CATALOG_MACRO_NAME = "use_catalog" | ||
|
||
SHOW_TABLE_EXTENDED_MACRO_NAME = "show_table_extended" | ||
SHOW_TABLES_MACRO_NAME = "show_tables" | ||
SHOW_VIEWS_MACRO_NAME = "show_views" | ||
|
||
|
@@ -120,7 +124,10 @@ def list_relations_without_caching( # type: ignore[override] | |
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) | ||
except dbt.exceptions.RuntimeException as e: | ||
errmsg = getattr(e, "msg", "") | ||
if f"Database '{schema_relation}' not found" in errmsg: | ||
if ( | ||
"[SCHEMA_NOT_FOUND]" in errmsg | ||
or f"Database '{schema_relation}' not found" in errmsg | ||
): | ||
return [] | ||
else: | ||
description = "Error while retrieving information about" | ||
|
@@ -139,6 +146,47 @@ def list_relations_without_caching( # type: ignore[override] | |
) | ||
] | ||
|
||
def _list_relations_with_information( | ||
self, schema_relation: DatabricksRelation | ||
) -> List[Tuple[DatabricksRelation, str]]: | ||
kwargs = {"schema_relation": schema_relation} | ||
try: | ||
# The catalog for `show table extended` needs to match the current catalog. | ||
with self._catalog(schema_relation.database): | ||
results = self.execute_macro(SHOW_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs) | ||
except dbt.exceptions.RuntimeException as e: | ||
errmsg = getattr(e, "msg", "") | ||
if ( | ||
"[SCHEMA_NOT_FOUND]" in errmsg | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there is an error code in the error message, will we get the error message details as well ("Database ... not found")? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error message was changed:
|
||
or f"Database '{schema_relation.without_identifier()}' not found" in errmsg | ||
): | ||
results = [] | ||
else: | ||
description = "Error while retrieving information about" | ||
logger.debug(f"{description} {schema_relation.without_identifier()}: {e.msg}") | ||
results = [] | ||
|
||
relations: List[Tuple[DatabricksRelation, str]] = [] | ||
for row in results: | ||
if len(row) != 4: | ||
raise dbt.exceptions.RuntimeException( | ||
f'Invalid value from "show table extended ...", ' | ||
f"got {len(row)} values, expected 4" | ||
) | ||
_schema, name, _, information = row | ||
rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table | ||
relation = self.Relation.create( | ||
database=schema_relation.database, | ||
# Use `_schema` retrieved from the cluster to avoid mismatched case | ||
# between the profile and the cluster. | ||
schema=_schema, | ||
identifier=name, | ||
type=rel_type, | ||
) | ||
relations.append((relation, information)) | ||
|
||
return relations | ||
|
||
@available.parse(lambda *a, **k: empty_table()) | ||
def get_relations_without_caching(self, relation: DatabricksRelation) -> Table: | ||
kwargs = {"relation": relation} | ||
|
@@ -272,6 +320,32 @@ def _set_relation_information(self, relation: DatabricksRelation) -> DatabricksR | |
|
||
return self._get_updated_relation(relation)[0] | ||
|
||
def parse_columns_from_information( # type: ignore[override] | ||
self, relation: DatabricksRelation, information: str | ||
) -> List[DatabricksColumn]: | ||
owner_match = re.findall(self.INFORMATION_OWNER_REGEX, information) | ||
owner = owner_match[0] if owner_match else None | ||
matches = re.finditer(self.INFORMATION_COLUMNS_REGEX, information) | ||
columns = [] | ||
stats_match = re.findall(self.INFORMATION_STATISTICS_REGEX, information) | ||
raw_table_stats = stats_match[0] if stats_match else None | ||
table_stats = DatabricksColumn.convert_table_stats(raw_table_stats) | ||
for match_num, match in enumerate(matches): | ||
column_name, column_type, nullable = match.groups() | ||
column = DatabricksColumn( | ||
table_database=relation.database, | ||
table_schema=relation.schema, | ||
table_name=relation.table, | ||
table_type=relation.type, | ||
column_index=(match_num + 1), | ||
table_owner=owner, | ||
column=column_name, | ||
dtype=DatabricksColumn.translate_type(column_type), | ||
table_stats=table_stats, | ||
) | ||
columns.append(column) | ||
return columns | ||
|
||
def get_catalog(self, manifest: Manifest) -> Tuple[Table, List[Exception]]: | ||
schema_map = self._get_catalog_schemas(manifest) | ||
|
||
|
@@ -287,10 +361,53 @@ def get_catalog(self, manifest: Manifest) -> Tuple[Table, List[Exception]]: | |
catalogs, exceptions = catch_as_completed(futures) | ||
return catalogs, exceptions | ||
|
||
def _get_one_catalog( | ||
self, | ||
information_schema: InformationSchema, | ||
schemas: Set[str], | ||
manifest: Manifest, | ||
) -> Table: | ||
if len(schemas) != 1: | ||
dbt.exceptions.raise_compiler_error( | ||
f"Expected only one schema in spark _get_one_catalog, found " f"{schemas}" | ||
) | ||
|
||
database = information_schema.database | ||
schema = list(schemas)[0] | ||
|
||
nodes: Iterator[ResultNode] = chain( | ||
( | ||
node | ||
for node in manifest.nodes.values() | ||
if (node.is_relational and not node.is_ephemeral_model) | ||
), | ||
manifest.sources.values(), | ||
) | ||
|
||
table_names: Set[str] = set() | ||
for node in nodes: | ||
if node.database == database and node.schema == schema: | ||
relation = self.Relation.create_from(self.config, node) | ||
if relation.identifier: | ||
table_names.add(relation.identifier) | ||
|
||
columns: List[Dict[str, Any]] = [] | ||
if len(table_names) > 0: | ||
schema_relation = self.Relation.create( | ||
database=database, | ||
schema=schema, | ||
identifier="|".join(table_names), | ||
quote_policy=self.config.quoting, | ||
) | ||
for relation, information in self._list_relations_with_information(schema_relation): | ||
logger.debug("Getting table schema for relation {}", relation) | ||
columns.extend(self._get_columns_for_catalog(relation, information)) | ||
return Table.from_object(columns, column_types=DEFAULT_TYPE_TESTER) | ||
|
||
def _get_columns_for_catalog( # type: ignore[override] | ||
self, relation: DatabricksRelation | ||
self, relation: DatabricksRelation, information: str | ||
) -> Iterable[Dict[str, Any]]: | ||
columns = self.get_columns_in_relation(relation) | ||
columns = self.parse_columns_from_information(relation, information) | ||
|
||
for column in columns: | ||
# convert DatabricksRelation into catalog dicts | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a port of dbt-labs/dbt-spark#358.