Skip to content

Commit

Permalink
reuse get_catalog logic, add ability to pass relations into new method
Browse files Browse the repository at this point in the history
  • Loading branch information
mikealfare committed Dec 16, 2023
1 parent 17c245f commit 9b0bd5f
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,24 +385,24 @@ def get_catalog(
def get_catalog_by_relations(
self, manifest: Manifest, relations: Set[BaseRelation]
) -> Tuple[agate.Table, List[Exception]]:
relations_by_info_schema = self._get_catalog_relations_by_info_schema(relations)
if len(relations_by_info_schema) != 1:
raise dbt.exceptions.CompilationError(
f"Expected exactly one information schema in get_catalog_by_relations, "
f"found {list(relations_by_info_schema)}"
)
schema_map = self._get_catalog_schemas(manifest)

with executor(self.config) as tpe:
info_schema, relations_in_info_schema = set(relations_by_info_schema.items()).pop()
future = tpe.submit_connected(
self,
info_schema.database,
self._get_one_catalog_by_relations,
info_schema,
relations_in_info_schema,
manifest,
)
catalogs, exceptions = catch_as_completed([future])
futures: List[Future[agate.Table]] = []
for info, schemas in schema_map.items():
for schema in schemas:
if relations_in_schema := [r for r in relations if r.schema == schema]:
futures.append(
tpe.submit_connected(
self,
schema,
self._get_one_catalog_by_relations,
info,
relations_in_schema,
manifest,
)
)
catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions

def _get_one_catalog(
Expand Down

0 comments on commit 9b0bd5f

Please sign in to comment.