Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use show table extended with table name list for get_catalog. #237

Merged
merged 4 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

### Features
- Avoid show table extended command. ([#231](https://github.com/databricks/dbt-databricks/pull/231))
- Use show table extended with table name list for get_catalog. ([#237](https://github.com/databricks/dbt-databricks/pull/237))

## dbt-databricks 1.3.2 (November 9, 2022)

Expand Down
13 changes: 13 additions & 0 deletions dbt/adapters/databricks/column.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
from dataclasses import dataclass
from typing import ClassVar, Dict

from dbt.adapters.spark.column import SparkColumn


@dataclass
class DatabricksColumn(SparkColumn):
TYPE_LABELS: ClassVar[Dict[str, str]] = {
"LONG": "BIGINT",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a port of dbt-labs/dbt-spark#358.

}

@classmethod
def translate_type(cls, dtype: str) -> str:
return super(SparkColumn, cls).translate_type(dtype).lower()

@property
def data_type(self) -> str:
return self.translate_type(self.dtype)

def __repr__(self) -> str:
return "<DatabricksColumn {} ({})>".format(self.name, self.data_type)
129 changes: 123 additions & 6 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from concurrent.futures import Future
from contextlib import contextmanager
from itertools import chain
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Type, Union
import re
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Type, Union

from agate import Row, Table, Text

from dbt.adapters.base import AdapterConfig, PythonJobHelper
from dbt.adapters.base.impl import catch_as_completed
from dbt.adapters.base.meta import available
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.base.relation import BaseRelation, InformationSchema
from dbt.adapters.spark.impl import (
SparkAdapter,
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME,
Expand All @@ -18,9 +20,10 @@
LIST_SCHEMAS_MACRO_NAME,
TABLE_OR_VIEW_NOT_FOUND_MESSAGES,
)
from dbt.clients.agate_helper import empty_table
from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER, empty_table
from dbt.contracts.connection import AdapterResponse, Connection
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ResultNode
from dbt.contracts.relation import RelationType
import dbt.exceptions
from dbt.events import AdapterLogger
Expand All @@ -41,6 +44,7 @@
CURRENT_CATALOG_MACRO_NAME = "current_catalog"
USE_CATALOG_MACRO_NAME = "use_catalog"

SHOW_TABLE_EXTENDED_MACRO_NAME = "show_table_extended"
SHOW_TABLES_MACRO_NAME = "show_tables"
SHOW_VIEWS_MACRO_NAME = "show_views"

Expand Down Expand Up @@ -120,7 +124,10 @@ def list_relations_without_caching( # type: ignore[override]
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.RuntimeException as e:
errmsg = getattr(e, "msg", "")
if f"Database '{schema_relation}' not found" in errmsg:
if (
"[SCHEMA_NOT_FOUND]" in errmsg
or f"Database '{schema_relation}' not found" in errmsg
):
return []
else:
description = "Error while retrieving information about"
Expand All @@ -139,6 +146,47 @@ def list_relations_without_caching( # type: ignore[override]
)
]

def _list_relations_with_information(
self, schema_relation: DatabricksRelation
) -> List[Tuple[DatabricksRelation, str]]:
kwargs = {"schema_relation": schema_relation}
try:
# The catalog for `show table extended` needs to match the current catalog.
with self._catalog(schema_relation.database):
results = self.execute_macro(SHOW_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.RuntimeException as e:
errmsg = getattr(e, "msg", "")
if (
"[SCHEMA_NOT_FOUND]" in errmsg
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an error code in the error message, will we get the error message details as well ("Database ... not found")?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message was changed:

[SCHEMA_NOT_FOUND] The schema <schemaName> cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a catalog, verify the current_schema() output, or qualify the name with the correct catalog.
To tolerate the error on drop use DROP SCHEMA IF EXISTS.

or f"Database '{schema_relation.without_identifier()}' not found" in errmsg
):
results = []
else:
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation.without_identifier()}: {e.msg}")
results = []

relations: List[Tuple[DatabricksRelation, str]] = []
for row in results:
if len(row) != 4:
raise dbt.exceptions.RuntimeException(
f'Invalid value from "show table extended ...", '
f"got {len(row)} values, expected 4"
)
_schema, name, _, information = row
rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table
relation = self.Relation.create(
database=schema_relation.database,
# Use `_schema` retrieved from the cluster to avoid mismatched case
# between the profile and the cluster.
schema=_schema,
identifier=name,
type=rel_type,
)
relations.append((relation, information))

return relations

@available.parse(lambda *a, **k: empty_table())
def get_relations_without_caching(self, relation: DatabricksRelation) -> Table:
kwargs = {"relation": relation}
Expand Down Expand Up @@ -272,6 +320,32 @@ def _set_relation_information(self, relation: DatabricksRelation) -> DatabricksR

return self._get_updated_relation(relation)[0]

def parse_columns_from_information( # type: ignore[override]
self, relation: DatabricksRelation, information: str
) -> List[DatabricksColumn]:
owner_match = re.findall(self.INFORMATION_OWNER_REGEX, information)
owner = owner_match[0] if owner_match else None
matches = re.finditer(self.INFORMATION_COLUMNS_REGEX, information)
columns = []
stats_match = re.findall(self.INFORMATION_STATISTICS_REGEX, information)
raw_table_stats = stats_match[0] if stats_match else None
table_stats = DatabricksColumn.convert_table_stats(raw_table_stats)
for match_num, match in enumerate(matches):
column_name, column_type, nullable = match.groups()
column = DatabricksColumn(
table_database=relation.database,
table_schema=relation.schema,
table_name=relation.table,
table_type=relation.type,
column_index=(match_num + 1),
table_owner=owner,
column=column_name,
dtype=DatabricksColumn.translate_type(column_type),
table_stats=table_stats,
)
columns.append(column)
return columns

def get_catalog(self, manifest: Manifest) -> Tuple[Table, List[Exception]]:
schema_map = self._get_catalog_schemas(manifest)

Expand All @@ -287,10 +361,53 @@ def get_catalog(self, manifest: Manifest) -> Tuple[Table, List[Exception]]:
catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions

def _get_one_catalog(
self,
information_schema: InformationSchema,
schemas: Set[str],
manifest: Manifest,
) -> Table:
if len(schemas) != 1:
dbt.exceptions.raise_compiler_error(
f"Expected only one schema in spark _get_one_catalog, found " f"{schemas}"
)

database = information_schema.database
schema = list(schemas)[0]

nodes: Iterator[ResultNode] = chain(
(
node
for node in manifest.nodes.values()
if (node.is_relational and not node.is_ephemeral_model)
),
manifest.sources.values(),
)

table_names: Set[str] = set()
for node in nodes:
if node.database == database and node.schema == schema:
relation = self.Relation.create_from(self.config, node)
if relation.identifier:
table_names.add(relation.identifier)

columns: List[Dict[str, Any]] = []
if len(table_names) > 0:
schema_relation = self.Relation.create(
database=database,
schema=schema,
identifier="|".join(table_names),
quote_policy=self.config.quoting,
)
for relation, information in self._list_relations_with_information(schema_relation):
logger.debug("Getting table schema for relation {}", relation)
columns.extend(self._get_columns_for_catalog(relation, information))
return Table.from_object(columns, column_types=DEFAULT_TYPE_TESTER)

def _get_columns_for_catalog( # type: ignore[override]
self, relation: DatabricksRelation
self, relation: DatabricksRelation, information: str
) -> Iterable[Dict[str, Any]]:
columns = self.get_columns_in_relation(relation)
columns = self.parse_columns_from_information(relation, information)

for column in columns:
# convert DatabricksRelation into catalog dicts
Expand Down
12 changes: 12 additions & 0 deletions dbt/include/databricks/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@
{{ return(adapter.get_relations_without_caching(schema_relation)) }}
{% endmacro %}

{% macro show_table_extended(schema_relation) %}
{{ return(adapter.dispatch('show_table_extended', 'dbt')(schema_relation)) }}
{% endmacro %}

{% macro databricks__show_table_extended(schema_relation) %}
{% call statement('show_table_extended', fetch_result=True) -%}
show table extended in {{ schema_relation.without_identifier() }} like '{{ schema_relation.identifier }}'
{% endcall %}

{% do return(load_result('show_table_extended').table) %}
{% endmacro %}

{% macro show_tables(relation) %}
{{ return(adapter.dispatch('show_tables', 'dbt')(relation)) }}
{% endmacro %}
Expand Down
Loading