Skip to content

Commit

Permalink
feat(meta): update StreamJob status on finish (#12342)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Sep 18, 2023
1 parent 784fe56 commit 71d9b0b
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 21 deletions.
20 changes: 16 additions & 4 deletions src/meta/src/manager/catalog/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use itertools::Itertools;
use risingwave_common::catalog::TableOption;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{
Connection, Database, Function, Index, Schema, Sink, Source, Table, View,
Connection, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, Table, View,
};

use super::{ConnectionId, DatabaseId, FunctionId, RelationId, SchemaId, SinkId, SourceId, ViewId};
Expand Down Expand Up @@ -147,10 +147,22 @@ impl DatabaseManager {
(
self.databases.values().cloned().collect_vec(),
self.schemas.values().cloned().collect_vec(),
self.tables.values().cloned().collect_vec(),
self.tables
.values()
.filter(|t| t.stream_job_status == PbStreamJobStatus::Created as i32)
.cloned()
.collect_vec(),
self.sources.values().cloned().collect_vec(),
self.sinks.values().cloned().collect_vec(),
self.indexes.values().cloned().collect_vec(),
self.sinks
.values()
.filter(|s| s.stream_job_status == PbStreamJobStatus::Created as i32)
.cloned()
.collect_vec(),
self.indexes
.values()
.filter(|i| i.stream_job_status == PbStreamJobStatus::Created as i32)
.cloned()
.collect_vec(),
self.views.values().cloned().collect_vec(),
self.functions.values().cloned().collect_vec(),
self.connections.values().cloned().collect_vec(),
Expand Down
41 changes: 26 additions & 15 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use risingwave_common::catalog::{
use risingwave_common::{bail, ensure};
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{
Connection, Database, Function, Index, Schema, Sink, Source, Table, View,
Connection, Database, Function, Index, PbStreamJobStatus, Schema, Sink, Source, Table, View,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, Object};
Expand Down Expand Up @@ -725,8 +725,8 @@ impl CatalogManager {
/// This is used for both `CREATE TABLE` and `CREATE MATERIALIZED VIEW`.
pub async fn finish_create_table_procedure(
&self,
internal_tables: Vec<Table>,
table: Table,
mut internal_tables: Vec<Table>,
mut table: Table,
) -> MetaResult<NotificationVersion> {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
Expand All @@ -742,8 +742,10 @@ impl CatalogManager {
.in_progress_creation_streaming_job
.remove(&table.id);

table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
for table in &internal_tables {
for table in &mut internal_tables {
table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
}
commit_meta!(self, tables)?;
Expand Down Expand Up @@ -1730,8 +1732,8 @@ impl CatalogManager {
pub async fn finish_create_table_procedure_with_source(
&self,
source: Source,
mview: Table,
internal_tables: Vec<Table>,
mut mview: Table,
mut internal_tables: Vec<Table>,
) -> MetaResult<NotificationVersion> {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
Expand Down Expand Up @@ -1762,8 +1764,10 @@ impl CatalogManager {
.remove(&mview.id);

sources.insert(source.id, source.clone());
mview.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(mview.id, mview.clone());
for table in &internal_tables {
for table in &mut internal_tables {
table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
}
commit_meta!(self, sources, tables)?;
Expand Down Expand Up @@ -1870,9 +1874,9 @@ impl CatalogManager {

pub async fn finish_create_index_procedure(
&self,
internal_tables: Vec<Table>,
index: Index,
table: Table,
mut internal_tables: Vec<Table>,
mut index: Index,
mut table: Table,
) -> MetaResult<NotificationVersion> {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
Expand All @@ -1891,10 +1895,13 @@ impl CatalogManager {
.in_progress_creation_streaming_job
.remove(&table.id);

index.stream_job_status = PbStreamJobStatus::Created.into();
indexes.insert(index.id, index.clone());

table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
for table in &internal_tables {
for table in &mut internal_tables {
table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
}
commit_meta!(self, indexes, tables)?;
Expand Down Expand Up @@ -1955,8 +1962,8 @@ impl CatalogManager {

pub async fn finish_create_sink_procedure(
&self,
internal_tables: Vec<Table>,
sink: Sink,
mut internal_tables: Vec<Table>,
mut sink: Sink,
) -> MetaResult<NotificationVersion> {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
Expand All @@ -1974,8 +1981,10 @@ impl CatalogManager {
.in_progress_creation_streaming_job
.remove(&sink.id);

sink.stream_job_status = PbStreamJobStatus::Created.into();
sinks.insert(sink.id, sink.clone());
for table in &internal_tables {
for table in &mut internal_tables {
table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
}
commit_meta!(self, sinks, tables)?;
Expand Down Expand Up @@ -2118,6 +2127,8 @@ impl CatalogManager {
// TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must
database_core.in_progress_creation_tracker.remove(&key);

let mut table = table.clone();
table.stream_job_status = PbStreamJobStatus::Created.into();
tables.insert(table.id, table.clone());
commit_meta!(self, tables, indexes, sources)?;

Expand All @@ -2127,7 +2138,7 @@ impl CatalogManager {
Operation::Update,
Info::RelationGroup(RelationGroup {
relations: vec![Relation {
relation_info: RelationInfo::Table(table.to_owned()).into(),
relation_info: RelationInfo::Table(table).into(),
}]
.into_iter()
.chain(source.iter().map(|source| Relation {
Expand Down
9 changes: 7 additions & 2 deletions src/meta/src/rpc/service/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use itertools::Itertools;
use risingwave_pb::backup_service::MetaBackupManifestId;
use risingwave_pb::catalog::Table;
use risingwave_pb::catalog::{PbStreamJobStatus, Table};
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_pb::hummock::WriteLimits;
Expand Down Expand Up @@ -120,7 +120,12 @@ impl NotificationServiceImpl {

async fn get_tables_and_creating_tables_snapshot(&self) -> (Vec<Table>, NotificationVersion) {
let catalog_guard = self.catalog_manager.get_catalog_core_guard().await;
let mut tables = catalog_guard.database.list_tables();
let mut tables = catalog_guard
.database
.list_tables()
.into_iter()
.filter(|t| t.stream_job_status == PbStreamJobStatus::Created as i32)
.collect_vec();
tables.extend(catalog_guard.database.list_creating_tables());
let notification_version = self.env.notification_manager().current_version().await;
(tables, notification_version)
Expand Down

0 comments on commit 71d9b0b

Please sign in to comment.