From 71d9b0bcb2fd804789f659ccc6dd15d9a556d664 Mon Sep 17 00:00:00 2001
From: Noel Kwan <47273164+kwannoel@users.noreply.github.com>
Date: Mon, 18 Sep 2023 15:32:00 +0800
Subject: [PATCH] feat(meta): update StreamJob status on finish (#12342)
---
src/meta/src/manager/catalog/database.rs | 20 +++++++--
src/meta/src/manager/catalog/mod.rs | 41 ++++++++++++-------
.../src/rpc/service/notification_service.rs | 9 +++-
3 files changed, 49 insertions(+), 21 deletions(-)
diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs
index 705474dd27a6d..9531e663a587a 100644
--- a/src/meta/src/manager/catalog/database.rs
+++ b/src/meta/src/manager/catalog/database.rs
@@ -19,7 +19,7 @@ use itertools::Itertools;
use risingwave_common::catalog::TableOption;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{
- Connection, Database, Function, Index, Schema, Sink, Source, Table, View,
+ Connection, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, Table, View,
};
use super::{ConnectionId, DatabaseId, FunctionId, RelationId, SchemaId, SinkId, SourceId, ViewId};
@@ -147,10 +147,22 @@ impl DatabaseManager {
(
self.databases.values().cloned().collect_vec(),
self.schemas.values().cloned().collect_vec(),
- self.tables.values().cloned().collect_vec(),
+ self.tables
+ .values()
+ .filter(|t| t.stream_job_status == PbStreamJobStatus::Created as i32)
+ .cloned()
+ .collect_vec(),
self.sources.values().cloned().collect_vec(),
- self.sinks.values().cloned().collect_vec(),
- self.indexes.values().cloned().collect_vec(),
+ self.sinks
+ .values()
+ .filter(|s| s.stream_job_status == PbStreamJobStatus::Created as i32)
+ .cloned()
+ .collect_vec(),
+ self.indexes
+ .values()
+ .filter(|i| i.stream_job_status == PbStreamJobStatus::Created as i32)
+ .cloned()
+ .collect_vec(),
self.views.values().cloned().collect_vec(),
self.functions.values().cloned().collect_vec(),
self.connections.values().cloned().collect_vec(),
diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs
index 6d512c1133d0d..1c8f0c2f397c0 100644
--- a/src/meta/src/manager/catalog/mod.rs
+++ b/src/meta/src/manager/catalog/mod.rs
@@ -34,7 +34,7 @@ use risingwave_common::catalog::{
use risingwave_common::{bail, ensure};
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{
- Connection, Database, Function, Index, Schema, Sink, Source, Table, View,
+ Connection, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, Table, View,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, Object};
@@ -725,8 +725,8 @@ impl CatalogManager {
/// This is used for both `CREATE TABLE` and `CREATE MATERIALIZED VIEW`.
pub async fn finish_create_table_procedure(
&self,
- internal_tables: Vec
,
- table: Table,
+ mut internal_tables: Vec,
+ mut table: Table,
) -> MetaResult {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
@@ -742,8 +742,10 @@ impl CatalogManager {
.in_progress_creation_streaming_job
.remove(&table.id);
+ table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
- for table in &internal_tables {
+ for table in &mut internal_tables {
+ table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
}
commit_meta!(self, tables)?;
@@ -1730,8 +1732,8 @@ impl CatalogManager {
pub async fn finish_create_table_procedure_with_source(
&self,
source: Source,
- mview: Table,
- internal_tables: Vec,
+ mut mview: Table,
+ mut internal_tables: Vec,
) -> MetaResult {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
@@ -1762,8 +1764,10 @@ impl CatalogManager {
.remove(&mview.id);
sources.insert(source.id, source.clone());
+ mview.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(mview.id, mview.clone());
- for table in &internal_tables {
+ for table in &mut internal_tables {
+ table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
}
commit_meta!(self, sources, tables)?;
@@ -1870,9 +1874,9 @@ impl CatalogManager {
pub async fn finish_create_index_procedure(
&self,
- internal_tables: Vec,
- index: Index,
- table: Table,
+ mut internal_tables: Vec,
+ mut index: Index,
+ mut table: Table,
) -> MetaResult {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
@@ -1891,10 +1895,13 @@ impl CatalogManager {
.in_progress_creation_streaming_job
.remove(&table.id);
+ index.stream_job_status = PbStreamJobStatus::Created.into();
indexes.insert(index.id, index.clone());
+ table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
- for table in &internal_tables {
+ for table in &mut internal_tables {
+ table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
}
commit_meta!(self, indexes, tables)?;
@@ -1955,8 +1962,8 @@ impl CatalogManager {
pub async fn finish_create_sink_procedure(
&self,
- internal_tables: Vec,
- sink: Sink,
+ mut internal_tables: Vec,
+ mut sink: Sink,
) -> MetaResult {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
@@ -1974,8 +1981,10 @@ impl CatalogManager {
.in_progress_creation_streaming_job
.remove(&sink.id);
+ sink.stream_job_status = PbStreamJobStatus::Created.into();
sinks.insert(sink.id, sink.clone());
- for table in &internal_tables {
+ for table in &mut internal_tables {
+ table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
}
commit_meta!(self, sinks, tables)?;
@@ -2118,6 +2127,8 @@ impl CatalogManager {
// TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must
database_core.in_progress_creation_tracker.remove(&key);
+ let mut table = table.clone();
+ table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
commit_meta!(self, tables, indexes, sources)?;
@@ -2127,7 +2138,7 @@ impl CatalogManager {
Operation::Update,
Info::RelationGroup(RelationGroup {
relations: vec![Relation {
- relation_info: RelationInfo::Table(table.to_owned()).into(),
+ relation_info: RelationInfo::Table(table).into(),
}]
.into_iter()
.chain(source.iter().map(|source| Relation {
diff --git a/src/meta/src/rpc/service/notification_service.rs b/src/meta/src/rpc/service/notification_service.rs
index 0fcbfe4929ec6..06f87e6d194c1 100644
--- a/src/meta/src/rpc/service/notification_service.rs
+++ b/src/meta/src/rpc/service/notification_service.rs
@@ -14,7 +14,7 @@
use itertools::Itertools;
use risingwave_pb::backup_service::MetaBackupManifestId;
-use risingwave_pb::catalog::Table;
+use risingwave_pb::catalog::{PbStreamJobStatus, Table};
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_pb::hummock::WriteLimits;
@@ -120,7 +120,12 @@ impl NotificationServiceImpl {
async fn get_tables_and_creating_tables_snapshot(&self) -> (Vec, NotificationVersion) {
let catalog_guard = self.catalog_manager.get_catalog_core_guard().await;
- let mut tables = catalog_guard.database.list_tables();
+ let mut tables = catalog_guard
+ .database
+ .list_tables()
+ .into_iter()
+ .filter(|t| t.stream_job_status == PbStreamJobStatus::Created as i32)
+ .collect_vec();
tables.extend(catalog_guard.database.list_creating_tables());
let notification_version = self.env.notification_manager().current_version().await;
(tables, notification_version)