From 309564d12696bc8d79549e659fbcfabd2c5b2df6 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 15 Sep 2023 16:14:06 +0800 Subject: [PATCH 1/3] update stream_job_status on finish --- src/meta/src/manager/catalog/mod.rs | 41 ++++++++++++++++++----------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 6d512c1133d0d..1c8f0c2f397c0 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -34,7 +34,7 @@ use risingwave_common::catalog::{ use risingwave_common::{bail, ensure}; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ - Connection, Database, Function, Index, Schema, Sink, Source, Table, View, + Connection, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, Table, View, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, Object}; @@ -725,8 +725,8 @@ impl CatalogManager { /// This is used for both `CREATE TABLE` and `CREATE MATERIALIZED VIEW`. pub async fn finish_create_table_procedure( &self, - internal_tables: Vec, - table: Table, + mut internal_tables: Vec
, + mut table: Table, ) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; @@ -742,8 +742,10 @@ impl CatalogManager { .in_progress_creation_streaming_job .remove(&table.id); + table.stream_job_status = PbStreamJobStatus::Created.into(); tables.insert(table.id, table.clone()); - for table in &internal_tables { + for table in &mut internal_tables { + table.stream_job_status = PbStreamJobStatus::Created.into(); tables.insert(table.id, table.clone()); } commit_meta!(self, tables)?; @@ -1730,8 +1732,8 @@ impl CatalogManager { pub async fn finish_create_table_procedure_with_source( &self, source: Source, - mview: Table, - internal_tables: Vec
, + mut mview: Table, + mut internal_tables: Vec
, ) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; @@ -1762,8 +1764,10 @@ impl CatalogManager { .remove(&mview.id); sources.insert(source.id, source.clone()); + mview.stream_job_status = PbStreamJobStatus::Created.into(); tables.insert(mview.id, mview.clone()); - for table in &internal_tables { + for table in &mut internal_tables { + table.stream_job_status = PbStreamJobStatus::Created.into(); tables.insert(table.id, table.clone()); } commit_meta!(self, sources, tables)?; @@ -1870,9 +1874,9 @@ impl CatalogManager { pub async fn finish_create_index_procedure( &self, - internal_tables: Vec
, - index: Index, - table: Table, + mut internal_tables: Vec
, + mut index: Index, + mut table: Table, ) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; @@ -1891,10 +1895,13 @@ impl CatalogManager { .in_progress_creation_streaming_job .remove(&table.id); + index.stream_job_status = PbStreamJobStatus::Created.into(); indexes.insert(index.id, index.clone()); + table.stream_job_status = PbStreamJobStatus::Created.into(); tables.insert(table.id, table.clone()); - for table in &internal_tables { + for table in &mut internal_tables { + table.stream_job_status = PbStreamJobStatus::Created.into(); tables.insert(table.id, table.clone()); } commit_meta!(self, indexes, tables)?; @@ -1955,8 +1962,8 @@ impl CatalogManager { pub async fn finish_create_sink_procedure( &self, - internal_tables: Vec
, - sink: Sink, + mut internal_tables: Vec
, + mut sink: Sink, ) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; @@ -1974,8 +1981,10 @@ impl CatalogManager { .in_progress_creation_streaming_job .remove(&sink.id); + sink.stream_job_status = PbStreamJobStatus::Created.into(); sinks.insert(sink.id, sink.clone()); - for table in &internal_tables { + for table in &mut internal_tables { + table.stream_job_status = PbStreamJobStatus::Created.into(); tables.insert(table.id, table.clone()); } commit_meta!(self, sinks, tables)?; @@ -2118,6 +2127,8 @@ impl CatalogManager { // TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must database_core.in_progress_creation_tracker.remove(&key); + let mut table = table.clone(); + table.stream_job_status = PbStreamJobStatus::Created.into(); tables.insert(table.id, table.clone()); commit_meta!(self, tables, indexes, sources)?; @@ -2127,7 +2138,7 @@ impl CatalogManager { Operation::Update, Info::RelationGroup(RelationGroup { relations: vec![Relation { - relation_info: RelationInfo::Table(table.to_owned()).into(), + relation_info: RelationInfo::Table(table).into(), }] .into_iter() .chain(source.iter().map(|source| Relation { From 2e5a40b29c38142589fc804f33f08cc252a07c1c Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 15 Sep 2023 16:24:53 +0800 Subject: [PATCH 2/3] only notify frontend of created tables --- src/meta/src/manager/catalog/database.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index 705474dd27a6d..e28bdbab693e8 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -18,9 +18,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use itertools::Itertools; use risingwave_common::catalog::TableOption; use risingwave_pb::catalog::table::TableType; -use risingwave_pb::catalog::{ - Connection, Database, Function, Index, Schema, Sink, Source, Table, View, -}; +use risingwave_pb::catalog::{Connection, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, Table, View}; use super::{ConnectionId, DatabaseId, FunctionId, RelationId, SchemaId, SinkId, SourceId, ViewId}; use crate::manager::{IndexId, MetaSrvEnv, TableId}; @@ -147,10 +145,16 @@ impl DatabaseManager { ( self.databases.values().cloned().collect_vec(), self.schemas.values().cloned().collect_vec(), - self.tables.values().cloned().collect_vec(), + self.tables.values() + .filter(|t| t.stream_job_status == PbStreamJobStatus::Created.into()) + .cloned().collect_vec(), self.sources.values().cloned().collect_vec(), - self.sinks.values().cloned().collect_vec(), - self.indexes.values().cloned().collect_vec(), + self.sinks.values() + .filter(|s| s.stream_job_status == PbStreamJobStatus::Created.into()) + .cloned().collect_vec(), + self.indexes.values() + .filter(|i| i.stream_job_status == PbStreamJobStatus::Created.into()) + .cloned().collect_vec(), self.views.values().cloned().collect_vec(), self.functions.values().cloned().collect_vec(), self.connections.values().cloned().collect_vec(), From 664d8cb16e5a1207a5dc65e6b1542a24778e76d2 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 15 Sep 2023 16:38:34 +0800 Subject: [PATCH 3/3] filter for compactor --- src/meta/src/manager/catalog/database.rs | 28 ++++++++++++------- .../src/rpc/service/notification_service.rs | 9 ++++-- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index e28bdbab693e8..9531e663a587a 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -18,7 +18,9 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use itertools::Itertools; use risingwave_common::catalog::TableOption; use risingwave_pb::catalog::table::TableType; -use risingwave_pb::catalog::{Connection, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, Table, View}; +use risingwave_pb::catalog::{ + Connection, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, Table, View, +}; use super::{ConnectionId, DatabaseId, FunctionId, RelationId, SchemaId, SinkId, SourceId, ViewId}; use crate::manager::{IndexId, MetaSrvEnv, TableId}; @@ -145,16 +147,22 @@ impl DatabaseManager { ( self.databases.values().cloned().collect_vec(), self.schemas.values().cloned().collect_vec(), - self.tables.values() - .filter(|t| t.stream_job_status == PbStreamJobStatus::Created.into()) - .cloned().collect_vec(), + self.tables + .values() + .filter(|t| t.stream_job_status == PbStreamJobStatus::Created as i32) + .cloned() + .collect_vec(), self.sources.values().cloned().collect_vec(), - self.sinks.values() - .filter(|s| s.stream_job_status == PbStreamJobStatus::Created.into()) - .cloned().collect_vec(), - self.indexes.values() - .filter(|i| i.stream_job_status == PbStreamJobStatus::Created.into()) - .cloned().collect_vec(), + self.sinks + .values() + .filter(|s| s.stream_job_status == PbStreamJobStatus::Created as i32) + .cloned() + .collect_vec(), + self.indexes + .values() + .filter(|i| i.stream_job_status == PbStreamJobStatus::Created as i32) + .cloned() + .collect_vec(), self.views.values().cloned().collect_vec(), self.functions.values().cloned().collect_vec(), self.connections.values().cloned().collect_vec(), diff --git a/src/meta/src/rpc/service/notification_service.rs b/src/meta/src/rpc/service/notification_service.rs index 0fcbfe4929ec6..06f87e6d194c1 100644 --- a/src/meta/src/rpc/service/notification_service.rs +++ b/src/meta/src/rpc/service/notification_service.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use risingwave_pb::backup_service::MetaBackupManifestId; -use risingwave_pb::catalog::Table; +use risingwave_pb::catalog::{PbStreamJobStatus, Table}; use risingwave_pb::common::worker_node::State::Running; use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::hummock::WriteLimits; @@ -120,7 +120,12 @@ impl NotificationServiceImpl { async fn get_tables_and_creating_tables_snapshot(&self) -> (Vec
, NotificationVersion) { let catalog_guard = self.catalog_manager.get_catalog_core_guard().await; - let mut tables = catalog_guard.database.list_tables(); + let mut tables = catalog_guard + .database + .list_tables() + .into_iter() + .filter(|t| t.stream_job_status == PbStreamJobStatus::Created as i32) + .collect_vec(); tables.extend(catalog_guard.database.list_creating_tables()); let notification_version = self.env.notification_manager().current_version().await; (tables, notification_version)