Skip to content

Commit

Permalink
call get_catalog by relation
Browse files Browse the repository at this point in the history
  • Loading branch information
mikealfare committed Dec 16, 2023
1 parent eab3067 commit 73979ee
Showing 1 changed file with 13 additions and 11 deletions.
24 changes: 13 additions & 11 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class SparkAdapter(SQLAdapter):
}

_capabilities = CapabilityDict(
{Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.NotImplemented)}
{Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full)}
)

Relation: TypeAlias = SparkRelation
Expand Down Expand Up @@ -390,15 +390,17 @@ def get_catalog_by_relations(
with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
for info_schema, relations_in_info_schema in relations_by_info_schema.items():
fut = tpe.submit_connected(
self,
info_schema.database,
self._get_one_catalog_by_relations,
info_schema,
relations_in_info_schema,
manifest,
)
futures.append(fut)
for relation in relations_in_info_schema:
futures.append(
tpe.submit_connected(
self,
relation.schema,
self._get_one_catalog_by_relations,
info_schema,
[relation],
manifest,
)
)
catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions

Expand All @@ -412,7 +414,7 @@ def _get_one_catalog(
raise dbt.exceptions.CompilationError(
f"Expected only one schema in spark _get_one_catalog, found " f"{schemas}"
)
relations = self.list_relations(information_schema.database, list(schemas)[0])
relations = self.list_relations(information_schema.database, schemas.pop())
return self._get_one_catalog_by_relations(information_schema, relations, manifest)

def _get_one_catalog_by_relations(
Expand Down

0 comments on commit 73979ee

Please sign in to comment.