Skip to content

Commit

Permalink
Use show table extended with table name list for get_catalog. (#237)
Browse files Browse the repository at this point in the history
### Description

Uses `show table extended` with table name list for `get_catalog`.

- Running `describe table extended` for all tables could be slower than `show table extended` with table name list.
- Statistics that will appear in the generated docs are not included in `describe table extended`.
  • Loading branch information
ueshin authored Dec 16, 2022
1 parent 8124c35 commit 5b85bfd
Show file tree
Hide file tree
Showing 5 changed files with 385 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

### Features
- Avoid show table extended command. ([#231](https://github.com/databricks/dbt-databricks/pull/231))
- Use show table extended with table name list for get_catalog. ([#237](https://github.com/databricks/dbt-databricks/pull/237))

## dbt-databricks 1.3.2 (November 9, 2022)

Expand Down
13 changes: 13 additions & 0 deletions dbt/adapters/databricks/column.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
from dataclasses import dataclass
from typing import ClassVar, Dict

from dbt.adapters.spark.column import SparkColumn


@dataclass
class DatabricksColumn(SparkColumn):
TYPE_LABELS: ClassVar[Dict[str, str]] = {
"LONG": "BIGINT",
}

@classmethod
def translate_type(cls, dtype: str) -> str:
return super(SparkColumn, cls).translate_type(dtype).lower()

@property
def data_type(self) -> str:
return self.translate_type(self.dtype)

def __repr__(self) -> str:
return "<DatabricksColumn {} ({})>".format(self.name, self.data_type)
129 changes: 123 additions & 6 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from concurrent.futures import Future
from contextlib import contextmanager
from itertools import chain
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Type, Union
import re
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Type, Union

from agate import Row, Table, Text

from dbt.adapters.base import AdapterConfig, PythonJobHelper
from dbt.adapters.base.impl import catch_as_completed
from dbt.adapters.base.meta import available
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.base.relation import BaseRelation, InformationSchema
from dbt.adapters.spark.impl import (
SparkAdapter,
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME,
Expand All @@ -18,9 +20,10 @@
LIST_SCHEMAS_MACRO_NAME,
TABLE_OR_VIEW_NOT_FOUND_MESSAGES,
)
from dbt.clients.agate_helper import empty_table
from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER, empty_table
from dbt.contracts.connection import AdapterResponse, Connection
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ResultNode
from dbt.contracts.relation import RelationType
import dbt.exceptions
from dbt.events import AdapterLogger
Expand All @@ -41,6 +44,7 @@
CURRENT_CATALOG_MACRO_NAME = "current_catalog"
USE_CATALOG_MACRO_NAME = "use_catalog"

SHOW_TABLE_EXTENDED_MACRO_NAME = "show_table_extended"
SHOW_TABLES_MACRO_NAME = "show_tables"
SHOW_VIEWS_MACRO_NAME = "show_views"

Expand Down Expand Up @@ -120,7 +124,10 @@ def list_relations_without_caching( # type: ignore[override]
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.RuntimeException as e:
errmsg = getattr(e, "msg", "")
if f"Database '{schema_relation}' not found" in errmsg:
if (
"[SCHEMA_NOT_FOUND]" in errmsg
or f"Database '{schema_relation}' not found" in errmsg
):
return []
else:
description = "Error while retrieving information about"
Expand All @@ -139,6 +146,47 @@ def list_relations_without_caching( # type: ignore[override]
)
]

def _list_relations_with_information(
self, schema_relation: DatabricksRelation
) -> List[Tuple[DatabricksRelation, str]]:
kwargs = {"schema_relation": schema_relation}
try:
# The catalog for `show table extended` needs to match the current catalog.
with self._catalog(schema_relation.database):
results = self.execute_macro(SHOW_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.RuntimeException as e:
errmsg = getattr(e, "msg", "")
if (
"[SCHEMA_NOT_FOUND]" in errmsg
or f"Database '{schema_relation.without_identifier()}' not found" in errmsg
):
results = []
else:
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation.without_identifier()}: {e.msg}")
results = []

relations: List[Tuple[DatabricksRelation, str]] = []
for row in results:
if len(row) != 4:
raise dbt.exceptions.RuntimeException(
f'Invalid value from "show table extended ...", '
f"got {len(row)} values, expected 4"
)
_schema, name, _, information = row
rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table
relation = self.Relation.create(
database=schema_relation.database,
# Use `_schema` retrieved from the cluster to avoid mismatched case
# between the profile and the cluster.
schema=_schema,
identifier=name,
type=rel_type,
)
relations.append((relation, information))

return relations

@available.parse(lambda *a, **k: empty_table())
def get_relations_without_caching(self, relation: DatabricksRelation) -> Table:
kwargs = {"relation": relation}
Expand Down Expand Up @@ -272,6 +320,32 @@ def _set_relation_information(self, relation: DatabricksRelation) -> DatabricksR

return self._get_updated_relation(relation)[0]

def parse_columns_from_information( # type: ignore[override]
self, relation: DatabricksRelation, information: str
) -> List[DatabricksColumn]:
owner_match = re.findall(self.INFORMATION_OWNER_REGEX, information)
owner = owner_match[0] if owner_match else None
matches = re.finditer(self.INFORMATION_COLUMNS_REGEX, information)
columns = []
stats_match = re.findall(self.INFORMATION_STATISTICS_REGEX, information)
raw_table_stats = stats_match[0] if stats_match else None
table_stats = DatabricksColumn.convert_table_stats(raw_table_stats)
for match_num, match in enumerate(matches):
column_name, column_type, nullable = match.groups()
column = DatabricksColumn(
table_database=relation.database,
table_schema=relation.schema,
table_name=relation.table,
table_type=relation.type,
column_index=(match_num + 1),
table_owner=owner,
column=column_name,
dtype=DatabricksColumn.translate_type(column_type),
table_stats=table_stats,
)
columns.append(column)
return columns

def get_catalog(self, manifest: Manifest) -> Tuple[Table, List[Exception]]:
schema_map = self._get_catalog_schemas(manifest)

Expand All @@ -287,10 +361,53 @@ def get_catalog(self, manifest: Manifest) -> Tuple[Table, List[Exception]]:
catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions

def _get_one_catalog(
self,
information_schema: InformationSchema,
schemas: Set[str],
manifest: Manifest,
) -> Table:
if len(schemas) != 1:
dbt.exceptions.raise_compiler_error(
f"Expected only one schema in spark _get_one_catalog, found " f"{schemas}"
)

database = information_schema.database
schema = list(schemas)[0]

nodes: Iterator[ResultNode] = chain(
(
node
for node in manifest.nodes.values()
if (node.is_relational and not node.is_ephemeral_model)
),
manifest.sources.values(),
)

table_names: Set[str] = set()
for node in nodes:
if node.database == database and node.schema == schema:
relation = self.Relation.create_from(self.config, node)
if relation.identifier:
table_names.add(relation.identifier)

columns: List[Dict[str, Any]] = []
if len(table_names) > 0:
schema_relation = self.Relation.create(
database=database,
schema=schema,
identifier="|".join(table_names),
quote_policy=self.config.quoting,
)
for relation, information in self._list_relations_with_information(schema_relation):
logger.debug("Getting table schema for relation {}", relation)
columns.extend(self._get_columns_for_catalog(relation, information))
return Table.from_object(columns, column_types=DEFAULT_TYPE_TESTER)

def _get_columns_for_catalog( # type: ignore[override]
self, relation: DatabricksRelation
self, relation: DatabricksRelation, information: str
) -> Iterable[Dict[str, Any]]:
columns = self.get_columns_in_relation(relation)
columns = self.parse_columns_from_information(relation, information)

for column in columns:
# convert DatabricksRelation into catalog dicts
Expand Down
12 changes: 12 additions & 0 deletions dbt/include/databricks/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@
{{ return(adapter.get_relations_without_caching(schema_relation)) }}
{% endmacro %}

{% macro show_table_extended(schema_relation) %}
{{ return(adapter.dispatch('show_table_extended', 'dbt')(schema_relation)) }}
{% endmacro %}

{% macro databricks__show_table_extended(schema_relation) %}
{% call statement('show_table_extended', fetch_result=True) -%}
show table extended in {{ schema_relation.without_identifier() }} like '{{ schema_relation.identifier }}'
{% endcall %}

{% do return(load_result('show_table_extended').table) %}
{% endmacro %}

{% macro show_tables(relation) %}
{{ return(adapter.dispatch('show_tables', 'dbt')(relation)) }}
{% endmacro %}
Expand Down
Loading

0 comments on commit 5b85bfd

Please sign in to comment.