From 339f089286e82dddd126f58420a633b6eba7d652 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Sat, 29 Jun 2024 00:13:44 +0800 Subject: [PATCH] Revert "notify internal table catalog for v2" This reverts commit 33d94ce1e6a05c10faeb0cb06ad70bde285411fb. --- src/meta/src/controller/streaming_job.rs | 19 ++----------------- src/meta/src/manager/streaming_job.rs | 4 ++-- src/meta/src/rpc/ddl_controller_v2.rs | 6 +----- 3 files changed, 5 insertions(+), 24 deletions(-) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 156f8e7a984b..3e2c5db4776d 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -309,13 +309,12 @@ impl CatalogController { pub async fn create_internal_table_catalog( &self, job_id: ObjectId, - is_mv: bool, internal_tables: Vec, ) -> MetaResult> { let inner = self.inner.write().await; let txn = inner.db.begin().await?; let mut table_id_map = HashMap::new(); - for table in &internal_tables { + for table in internal_tables { let table_id = Self::create_object( &txn, ObjectType::Table, @@ -326,27 +325,13 @@ impl CatalogController { .await? .oid; table_id_map.insert(table.id, table_id as u32); - let mut table: table::ActiveModel = table.clone().into(); + let mut table: table::ActiveModel = table.into(); table.table_id = Set(table_id as _); table.belongs_to_job_id = Set(Some(job_id as _)); table.fragment_id = NotSet; Table::insert(table).exec(&txn).await?; } txn.commit().await?; - if is_mv { - let relations = internal_tables - .iter() - .map(|table| Relation { - relation_info: Some(RelationInfo::Table(table.to_owned())), - }) - .collect_vec(); - let _version = self - .notify_frontend( - Operation::Delete, - Info::RelationGroup(RelationGroup { relations }), - ) - .await; - } Ok(table_id_map) } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 4b43945b1888..a9a159e6a453 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -17,13 +17,13 @@ use risingwave_common::current_cluster_version; use risingwave_common::util::epoch::Epoch; use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table}; use risingwave_pb::ddl_service::TableJobType; -use strum::{EnumDiscriminants, EnumIs}; +use strum::EnumDiscriminants; use crate::model::FragmentId; // This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and // Sink. -#[derive(Debug, Clone, EnumDiscriminants, EnumIs)] +#[derive(Debug, Clone, EnumDiscriminants)] pub enum StreamingJob { MaterializedView(Table), Sink(Sink, Option<(Table, Option)>), diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 35f104731868..0dabc9b19022 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -139,11 +139,7 @@ impl DdlController { let internal_tables = fragment_graph.internal_tables().into_values().collect_vec(); let table_id_map = mgr .catalog_controller - .create_internal_table_catalog( - streaming_job.id() as _, - streaming_job.is_materialized_view(), - internal_tables, - ) + .create_internal_table_catalog(streaming_job.id() as _, internal_tables) .await?; fragment_graph.refill_internal_table_ids(table_id_map);