Skip to content

Commit

Permalink
fix(meta): fix backwards compat issues for Table::stream_job_status
Browse files Browse the repository at this point in the history
… field (#12434)
  • Loading branch information
kwannoel authored Sep 19, 2023
1 parent 5cd4082 commit 7324a95
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
15 changes: 12 additions & 3 deletions src/meta/src/manager/catalog/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,18 +149,27 @@ impl DatabaseManager {
self.schemas.values().cloned().collect_vec(),
self.tables
.values()
.filter(|t| t.stream_job_status == PbStreamJobStatus::Created as i32)
.filter(|t| {
t.stream_job_status == PbStreamJobStatus::Unspecified as i32
|| t.stream_job_status == PbStreamJobStatus::Created as i32
})
.cloned()
.collect_vec(),
self.sources.values().cloned().collect_vec(),
self.sinks
.values()
.filter(|s| s.stream_job_status == PbStreamJobStatus::Created as i32)
.filter(|t| {
t.stream_job_status == PbStreamJobStatus::Unspecified as i32
|| t.stream_job_status == PbStreamJobStatus::Created as i32
})
.cloned()
.collect_vec(),
self.indexes
.values()
.filter(|i| i.stream_job_status == PbStreamJobStatus::Created as i32)
.filter(|t| {
t.stream_job_status == PbStreamJobStatus::Unspecified as i32
|| t.stream_job_status == PbStreamJobStatus::Created as i32
})
.cloned()
.collect_vec(),
self.views.values().cloned().collect_vec(),
Expand Down
5 changes: 4 additions & 1 deletion src/meta/src/rpc/service/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ impl NotificationServiceImpl {
.database
.list_tables()
.into_iter()
.filter(|t| t.stream_job_status == PbStreamJobStatus::Created as i32)
.filter(|t| {
t.stream_job_status == PbStreamJobStatus::Unspecified as i32
|| t.stream_job_status == PbStreamJobStatus::Created as i32
})
.collect_vec();
tables.extend(catalog_guard.database.list_creating_tables());
let notification_version = self.env.notification_manager().current_version().await;
Expand Down

0 comments on commit 7324a95

Please sign in to comment.