diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 42ca9af7e8459a..b29170cb2d705d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -262,7 +262,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.report.report_ingestion_stage_start("Ingestion Setup") wait_on_warehouse = None - if self.config.is_profiling_enabled() or self.config.include_hive_metastore: + if self.config.include_hive_metastore: self.report.report_ingestion_stage_start("Start warehouse") # Can take several minutes, so start now and wait later wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() @@ -309,9 +309,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) if self.config.is_profiling_enabled(): - self.report.report_ingestion_stage_start("Wait on warehouse") - assert wait_on_warehouse - wait_on_warehouse.result() + self.report.report_ingestion_stage_start("Start warehouse") + # Need to start the warehouse again for profiling, + # as it may have been stopped after ingestion might take + # longer time to complete + wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() + if wait_on_warehouse is None: + self.report.report_failure( + "initialization", + f"SQL warehouse {self.config.profiling.warehouse_id} not found", + ) + return + else: + # wait until warehouse is started + wait_on_warehouse.result() self.report.report_ingestion_stage_start("Profiling") if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig):