diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index 648f09951d6b2..d1a002408bc25 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -445,6 +445,18 @@ impl DatabaseManager { .map(|(k, _)| *k) } + pub fn find_persisted_creating_table_id(&self, key: &RelationKey) -> Option { + self.tables + .iter() + .find(|(_, t)| { + t.stream_job_status == PbStreamJobStatus::Creating as i32 + && t.database_id == key.0 + && t.schema_id == key.1 + && t.name == key.2 + }) + .map(|(k, _)| *k) + } + pub fn all_creating_streaming_jobs(&self) -> impl Iterator + '_ { self.in_progress_creation_streaming_job.keys().cloned() } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 9254702073e99..65daa69def417 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -2621,11 +2621,15 @@ impl CatalogManager { infos .into_iter() .flat_map(|info| { - guard.database.find_creating_streaming_job_id(&( - info.database_id, - info.schema_id, - info.name, - )) + let relation_key = &(info.database_id, info.schema_id, info.name); + guard + .database + .find_creating_streaming_job_id(relation_key) + .or_else(|| { + guard + .database + .find_persisted_creating_table_id(relation_key) + }) }) .collect_vec() }