From 7324a950125226298f50b6f6462bc70072b0712c Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Tue, 19 Sep 2023 22:52:31 +0800 Subject: [PATCH] fix(meta): fix backwards compat issues for `Table::stream_job_status` field (#12434) --- src/meta/src/manager/catalog/database.rs | 15 ++++++++++++--- src/meta/src/rpc/service/notification_service.rs | 5 ++++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index 9531e663a587..ad1928e0bdd5 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -149,18 +149,27 @@ impl DatabaseManager { self.schemas.values().cloned().collect_vec(), self.tables .values() - .filter(|t| t.stream_job_status == PbStreamJobStatus::Created as i32) + .filter(|t| { + t.stream_job_status == PbStreamJobStatus::Unspecified as i32 + || t.stream_job_status == PbStreamJobStatus::Created as i32 + }) .cloned() .collect_vec(), self.sources.values().cloned().collect_vec(), self.sinks .values() - .filter(|s| s.stream_job_status == PbStreamJobStatus::Created as i32) + .filter(|t| { + t.stream_job_status == PbStreamJobStatus::Unspecified as i32 + || t.stream_job_status == PbStreamJobStatus::Created as i32 + }) .cloned() .collect_vec(), self.indexes .values() - .filter(|i| i.stream_job_status == PbStreamJobStatus::Created as i32) + .filter(|t| { + t.stream_job_status == PbStreamJobStatus::Unspecified as i32 + || t.stream_job_status == PbStreamJobStatus::Created as i32 + }) .cloned() .collect_vec(), self.views.values().cloned().collect_vec(), diff --git a/src/meta/src/rpc/service/notification_service.rs b/src/meta/src/rpc/service/notification_service.rs index 06f87e6d194c..18bdc30c4165 100644 --- a/src/meta/src/rpc/service/notification_service.rs +++ b/src/meta/src/rpc/service/notification_service.rs @@ -124,7 +124,10 @@ impl NotificationServiceImpl { .database .list_tables() .into_iter() - .filter(|t| t.stream_job_status == PbStreamJobStatus::Created as i32) + .filter(|t| { + t.stream_job_status == PbStreamJobStatus::Unspecified as i32 + || t.stream_job_status == PbStreamJobStatus::Created as i32 + }) .collect_vec(); tables.extend(catalog_guard.database.list_creating_tables()); let notification_version = self.env.notification_manager().current_version().await;