diff --git a/CHANGELOG.md b/CHANGELOG.md index 17584503d..a8717dec4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ### Features -- Added support for getting info only on specified relations to improve performance of gathering metadata ([486](https://github.com/databricks/dbt-databricks/pull/486)) +- Added support for getting info only on specified relations to improve performance of gathering metadata ([486](https://github.com/databricks/dbt-databricks/pull/486)), also (with generous help from from @mikealfare) ([499](https://github.com/databricks/dbt-databricks/pull/499)) - Added support for getting freshness from metadata ([481](https://github.com/databricks/dbt-databricks/pull/481)) ### Fixes diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 0fcf2dc3f..013d98fbe 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -234,8 +234,7 @@ def get_relations_without_caching(self, relation: DatabricksRelation) -> Table: kwargs = {"relation": relation} new_rows: List[Tuple[Optional[str], str, str, str]] - if relation.database is not None: - assert relation.schema is not None + if all([relation.database, relation.schema]): tables = self.connections.list_tables( database=relation.database, schema=relation.schema ) @@ -256,39 +255,19 @@ def get_relations_without_caching(self, relation: DatabricksRelation) -> Table: # if there are any table types to be resolved if any(not row[3] for row in new_rows): - # Get view names and create a dictionay of view name to materialization + # Get view names and create a dictionary of view name to materialization with self._catalog(relation.database): views = self.execute_macro(SHOW_VIEWS_MACRO_NAME, kwargs=kwargs) - + tables = self.execute_macro( + SHOW_TABLE_EXTENDED_MACRO_NAME, kwargs={"schema_relation": relation} + ) view_names: Dict[str, bool] = { view["viewName"]: view.get("isMaterialized", False) for view in views } - - # a function to resolve an unknown table type - def typeFromNames( - database: Optional[str], schema: str, name: str - ) -> DatabricksRelationType: - if name in view_names: - # it is either a view or a materialized view - return ( - DatabricksRelationType.MaterializedView - if view_names[name] - else DatabricksRelationType.View - ) - elif is_hive_metastore(database): - return DatabricksRelationType.Table - else: - # not a view so it might be a streaming table - # get extended information to determine - rel = self.Relation.create(database, schema, name) - rel = self._set_relation_information(rel) - if ( - rel.metadata is not None - and rel.metadata.get("Type", "table") == "STREAMING_TABLE" - ): - return DatabricksRelationType.StreamingTable - else: - return DatabricksRelationType.Table + table_names: Dict[str, bool] = { + table["tableName"]: (self._parse_type(table["information"]) == "STREAMING_TABLE") + for table in tables + } # create a new collection of rows with the correct table types new_rows = [ @@ -296,7 +275,11 @@ def typeFromNames( row[0], row[1], row[2], - str(row[3] if row[3] else typeFromNames(row[0], row[1], row[2])), + str( + row[3] + if row[3] + else self._type_from_names(row[0], row[2], view_names, table_names) + ), ) for row in new_rows ] @@ -307,6 +290,40 @@ def typeFromNames( column_types=[Text(), Text(), Text(), Text()], ) + def _parse_type(self, information: str) -> str: + type_entry = [ + entry.strip() for entry in information.split("\n") if entry.split(":")[0] == "Type" + ] + return type_entry[0] if type_entry else "" + + def _type_from_names( + self, + database: Optional[str], + name: str, + view_names: Dict[str, bool], + table_names: Dict[str, bool], + ) -> DatabricksRelationType: + if name in view_names: + # it is either a view or a materialized view + return ( + DatabricksRelationType.MaterializedView + if view_names[name] + else DatabricksRelationType.View + ) + elif is_hive_metastore(database): + return DatabricksRelationType.Table + elif name in table_names: + # it is either a table or a streaming table + return ( + DatabricksRelationType.StreamingTable + if table_names[name] + else DatabricksRelationType.Table + ) + else: + raise dbt.exceptions.DbtRuntimeError( + f"Unexpected relation type discovered: Database:{database}, Relation:{name}" + ) + def get_relation( self, database: Optional[str],