From 2df34ee858dae99227f4957585671cc09c736eb5 Mon Sep 17 00:00:00 2001 From: Dylan Date: Tue, 14 Nov 2023 18:35:44 +0800 Subject: [PATCH] fix(meta): fix ctrl c cancels streaming job (#13415) --- src/meta/src/manager/catalog/database.rs | 12 ++++++++++++ src/meta/src/manager/catalog/mod.rs | 14 +++++++++----- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index 648f09951d6b..d1a002408bc2 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -445,6 +445,18 @@ impl DatabaseManager { .map(|(k, _)| *k) } + pub fn find_persisted_creating_table_id(&self, key: &RelationKey) -> Option { + self.tables + .iter() + .find(|(_, t)| { + t.stream_job_status == PbStreamJobStatus::Creating as i32 + && t.database_id == key.0 + && t.schema_id == key.1 + && t.name == key.2 + }) + .map(|(k, _)| *k) + } + pub fn all_creating_streaming_jobs(&self) -> impl Iterator + '_ { self.in_progress_creation_streaming_job.keys().cloned() } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 9254702073e9..65daa69def41 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -2621,11 +2621,15 @@ impl CatalogManager { infos .into_iter() .flat_map(|info| { - guard.database.find_creating_streaming_job_id(&( - info.database_id, - info.schema_id, - info.name, - )) + let relation_key = &(info.database_id, info.schema_id, info.name); + guard + .database + .find_creating_streaming_job_id(relation_key) + .or_else(|| { + guard + .database + .find_persisted_creating_table_id(relation_key) + }) }) .collect_vec() }