From 1c0b3aca41ce65836dee4967b519d8e3656b0a1b Mon Sep 17 00:00:00 2001 From: Ben Cassell Date: Thu, 18 Jan 2024 10:28:31 -0800 Subject: [PATCH 1/3] Return to old way of determining types to streamline hive type determination --- dbt/adapters/databricks/impl.py | 89 ++++++++++++++++++++++----------- tests/conftest.py | 2 +- 2 files changed, 61 insertions(+), 30 deletions(-) diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 0a4b142c3..2eaa6a87f 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -283,46 +283,77 @@ def get_relations_without_caching(self, relation: DatabricksRelation) -> Table: # if there are any table types to be resolved if any(not row[3] for row in new_rows): - # Get view names and create a dictionary of view name to materialization - relation_all_tables = self.Relation.create( - database=relation.database, schema=relation.schema, identifier="*" - ) + if is_hive_metastore(relation.database): + new_rows = self._get_hive_types(relation, new_rows) + else: + new_rows = self._get_uc_types(relation, new_rows) - with self._catalog(relation.database): - views = self.execute_macro(SHOW_VIEWS_MACRO_NAME, kwargs=kwargs) - tables = self.execute_macro( - SHOW_TABLE_EXTENDED_MACRO_NAME, - kwargs={"schema_relation": relation_all_tables}, - ) - view_names: Dict[str, bool] = { - view["viewName"]: view.get("isMaterialized", False) for view in views - } - table_names: Dict[str, bool] = { - table["tableName"]: (self._parse_type(table["information"]) == "STREAMING_TABLE") - for table in tables - } - - # create a new collection of rows with the correct table types - new_rows = [ + return Table( + new_rows, + column_names=["database_name", "schema_name", "name", "kind"], + column_types=[Text(), Text(), Text(), Text()], + ) + + def _get_hive_types( + self, relation: DatabricksRelation, new_rows: List[Tuple[Optional[str], str, str, str]] + ) -> List[Tuple[Optional[str], str, str, str]]: + kwargs = {"relation": relation} + + with self._catalog(relation.database): + views = self.execute_macro(SHOW_VIEWS_MACRO_NAME, kwargs=kwargs) + + view_names = set(views.columns["viewName"].values()) # type: ignore[attr-defined] + return [ ( row[0], row[1], row[2], - str( - row[3] - if row[3] - else self._type_from_names(row[0], row[2], view_names, table_names) - ), + str(RelationType.View if row[2] in view_names else RelationType.Table), ) for row in new_rows ] - return Table( - new_rows, - column_names=["database_name", "schema_name", "name", "kind"], - column_types=[Text(), Text(), Text(), Text()], + def _get_uc_types( + self, relation: DatabricksRelation, new_rows: List[Tuple[Optional[str], str, str, str]] + ) -> List[Tuple[Optional[str], str, str, str]]: + kwargs = {"relation": relation} + + # Get view names and create a dictionary of view name to materialization + relation_all_tables = self.Relation.create( + database=relation.database, schema=relation.schema, identifier="*" ) + with self._catalog(relation.database): + views = self.execute_macro(SHOW_VIEWS_MACRO_NAME, kwargs=kwargs) + tables = self.execute_macro( + SHOW_TABLE_EXTENDED_MACRO_NAME, + kwargs={"schema_relation": relation_all_tables}, + ) + view_names: Dict[str, bool] = { + view["viewName"]: view.get("isMaterialized", False) for view in views + } + table_names: Dict[str, bool] = { + table["tableName"]: (self._parse_type(table["information"]) == "STREAMING_TABLE") + for table in tables + } + + # create a new collection of rows with the correct table types + new_rows = [ + ( + row[0], + row[1], + row[2], + str( + row[3] + if row[3] + else self._type_from_names(row[0], row[2], view_names, table_names) + ), + ) + for row in new_rows + ] + + return new_rows + def _parse_type(self, information: str) -> str: type_entry = [ entry.split(":")[1].strip() diff --git a/tests/conftest.py b/tests/conftest.py index e61334144..ee4a502b9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,7 +7,7 @@ def pytest_addoption(parser): - parser.addoption("--profile", action="store", default="databricks_uc_sql_endpoint", type=str) + parser.addoption("--profile", action="store", default="databricks_cluster", type=str) # Using @pytest.mark.skip_profile('databricks_cluster') uses the 'skip_by_adapter_type' From 17c04a7995df4b7d1e6971b159bb39acd2863a14 Mon Sep 17 00:00:00 2001 From: Ben Cassell Date: Thu, 18 Jan 2024 10:29:13 -0800 Subject: [PATCH 2/3] undo --- tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index ee4a502b9..e61334144 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,7 +7,7 @@ def pytest_addoption(parser): - parser.addoption("--profile", action="store", default="databricks_cluster", type=str) + parser.addoption("--profile", action="store", default="databricks_uc_sql_endpoint", type=str) # Using @pytest.mark.skip_profile('databricks_cluster') uses the 'skip_by_adapter_type' From 38aa7377e422b9a9d27bc445f6d0ea50b1f3a284 Mon Sep 17 00:00:00 2001 From: Ben Cassell Date: Thu, 18 Jan 2024 13:20:20 -0800 Subject: [PATCH 3/3] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e37072646..285d62bb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Allow schema to be specified in testing (thanks @case-k-git!) ([538](https://github.com/databricks/dbt-databricks/pull/538)) - Fix dbt incremental_strategy behavior by fixing schema table existing check (thanks @case-k-git!) ([530](https://github.com/databricks/dbt-databricks/pull/530)) - Fixed bug that was causing streaming tables to be dropped and recreated instead of refreshed. ([552](https://github.com/databricks/dbt-databricks/pull/552)) +- Fixed Hive performance regression by streamlining materialization type acquisition ([557](https://github.com/databricks/dbt-databricks/pull/557)) ### Under the Hood