Skip to content

Commit

Permalink
no more describe extended
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db committed Nov 9, 2023
1 parent e3e27e1 commit 8cdb221
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 32 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

### Features

- Added support for getting info only on specified relations to improve performance of gathering metadata ([486](https://github.com/databricks/dbt-databricks/pull/486))
- Added support for getting info only on specified relations to improve performance of gathering metadata ([486](https://github.com/databricks/dbt-databricks/pull/486)), also (with generous help from from @mikealfare) ([499](https://github.com/databricks/dbt-databricks/pull/499))
- Added support for getting freshness from metadata ([481](https://github.com/databricks/dbt-databricks/pull/481))

### Fixes
Expand Down
79 changes: 48 additions & 31 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ def get_relations_without_caching(self, relation: DatabricksRelation) -> Table:
kwargs = {"relation": relation}

new_rows: List[Tuple[Optional[str], str, str, str]]
if relation.database is not None:
assert relation.schema is not None
if all([relation.database, relation.schema]):
tables = self.connections.list_tables(
database=relation.database, schema=relation.schema
)
Expand All @@ -256,47 +255,31 @@ def get_relations_without_caching(self, relation: DatabricksRelation) -> Table:

# if there are any table types to be resolved
if any(not row[3] for row in new_rows):
# Get view names and create a dictionay of view name to materialization
# Get view names and create a dictionary of view name to materialization
with self._catalog(relation.database):
views = self.execute_macro(SHOW_VIEWS_MACRO_NAME, kwargs=kwargs)

tables = self.execute_macro(
SHOW_TABLE_EXTENDED_MACRO_NAME, kwargs={"schema_relation": relation}
)
view_names: Dict[str, bool] = {
view["viewName"]: view.get("isMaterialized", False) for view in views
}

# a function to resolve an unknown table type
def typeFromNames(
database: Optional[str], schema: str, name: str
) -> DatabricksRelationType:
if name in view_names:
# it is either a view or a materialized view
return (
DatabricksRelationType.MaterializedView
if view_names[name]
else DatabricksRelationType.View
)
elif is_hive_metastore(database):
return DatabricksRelationType.Table
else:
# not a view so it might be a streaming table
# get extended information to determine
rel = self.Relation.create(database, schema, name)
rel = self._set_relation_information(rel)
if (
rel.metadata is not None
and rel.metadata.get("Type", "table") == "STREAMING_TABLE"
):
return DatabricksRelationType.StreamingTable
else:
return DatabricksRelationType.Table
table_names: Dict[str, bool] = {
table["tableName"]: (self._parse_type(table["information"]) == "STREAMING_TABLE")
for table in tables
}

# create a new collection of rows with the correct table types
new_rows = [
(
row[0],
row[1],
row[2],
str(row[3] if row[3] else typeFromNames(row[0], row[1], row[2])),
str(
row[3]
if row[3]
else self._type_from_names(row[0], row[2], view_names, table_names)
),
)
for row in new_rows
]
Expand All @@ -307,6 +290,40 @@ def typeFromNames(
column_types=[Text(), Text(), Text(), Text()],
)

def _parse_type(self, information: str) -> str:
type_entry = [
entry.strip() for entry in information.split("\n") if entry.split(":")[0] == "Type"
]
return type_entry[0] if type_entry else ""

def _type_from_names(
self,
database: Optional[str],
name: str,
view_names: Dict[str, bool],
table_names: Dict[str, bool],
) -> DatabricksRelationType:
if name in view_names:
# it is either a view or a materialized view
return (
DatabricksRelationType.MaterializedView
if view_names[name]
else DatabricksRelationType.View
)
elif is_hive_metastore(database):
return DatabricksRelationType.Table
elif name in table_names:
# it is either a table or a streaming table
return (
DatabricksRelationType.StreamingTable
if table_names[name]
else DatabricksRelationType.Table
)
else:
raise dbt.exceptions.DbtRuntimeError(
f"Unexpected relation type discovered: Database:{database}, Relation:{name}"
)

def get_relation(
self,
database: Optional[str],
Expand Down

0 comments on commit 8cdb221

Please sign in to comment.