From fff3849830b66b2ea684b1ba3c669342b2e81454 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Sat, 29 Jun 2024 00:23:27 +0800 Subject: [PATCH] Revert "Revert "notify internal table catalog for v2"" This reverts commit 339f089286e82dddd126f58420a633b6eba7d652. --- src/meta/src/controller/streaming_job.rs | 19 +++++++++++++++++-- src/meta/src/manager/streaming_job.rs | 4 ++-- src/meta/src/rpc/ddl_controller_v2.rs | 6 +++++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 1df14094a0834..650bef23c0bbb 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -324,12 +324,13 @@ impl CatalogController { pub async fn create_internal_table_catalog( &self, job_id: ObjectId, + is_mv: bool, internal_tables: Vec, ) -> MetaResult> { let inner = self.inner.write().await; let txn = inner.db.begin().await?; let mut table_id_map = HashMap::new(); - for table in internal_tables { + for table in &internal_tables { let table_id = Self::create_object( &txn, ObjectType::Table, @@ -340,13 +341,27 @@ impl CatalogController { .await? .oid; table_id_map.insert(table.id, table_id as u32); - let mut table: table::ActiveModel = table.into(); + let mut table: table::ActiveModel = table.clone().into(); table.table_id = Set(table_id as _); table.belongs_to_job_id = Set(Some(job_id as _)); table.fragment_id = NotSet; Table::insert(table).exec(&txn).await?; } txn.commit().await?; + if is_mv { + let relations = internal_tables + .iter() + .map(|table| Relation { + relation_info: Some(RelationInfo::Table(table.to_owned())), + }) + .collect_vec(); + let _version = self + .notify_frontend( + Operation::Delete, + Info::RelationGroup(RelationGroup { relations }), + ) + .await; + } Ok(table_id_map) } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index a9a159e6a4535..4b43945b18886 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -17,13 +17,13 @@ use risingwave_common::current_cluster_version; use risingwave_common::util::epoch::Epoch; use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table}; use risingwave_pb::ddl_service::TableJobType; -use strum::EnumDiscriminants; +use strum::{EnumDiscriminants, EnumIs}; use crate::model::FragmentId; // This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and // Sink. -#[derive(Debug, Clone, EnumDiscriminants)] +#[derive(Debug, Clone, EnumDiscriminants, EnumIs)] pub enum StreamingJob { MaterializedView(Table), Sink(Sink, Option<(Table, Option)>), diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 0dabc9b19022d..35f1047318684 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -139,7 +139,11 @@ impl DdlController { let internal_tables = fragment_graph.internal_tables().into_values().collect_vec(); let table_id_map = mgr .catalog_controller - .create_internal_table_catalog(streaming_job.id() as _, internal_tables) + .create_internal_table_catalog( + streaming_job.id() as _, + streaming_job.is_materialized_view(), + internal_tables, + ) .await?; fragment_graph.refill_internal_table_ids(table_id_map);