From b3655662fd366845a99caaf2b3e95dbb5bbffbe1 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 16 Oct 2024 17:43:14 +0800 Subject: [PATCH 1/5] refactor(meta): only notify complete catalogs when creating mv Signed-off-by: Bugen Zhao only notify internal tables Signed-off-by: Bugen Zhao add more comments Signed-off-by: Bugen Zhao --- src/frontend/src/observer/observer_manager.rs | 1 + src/meta/src/controller/streaming_job.rs | 54 +++++++++++-------- src/meta/src/model/stream.rs | 21 ++++++++ src/meta/src/rpc/ddl_controller.rs | 19 +++++-- src/meta/src/stream/stream_graph/fragment.rs | 13 ++++- src/meta/src/stream/test_fragmenter.rs | 2 +- 6 files changed, 83 insertions(+), 27 deletions(-) diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index b1ecf4182d1d0..4133f708688eb 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -245,6 +245,7 @@ impl FrontendObserverNode { let Some(info) = resp.info.as_ref() else { return; }; + tracing::trace!(?info, "handle catalog notification"); let mut catalog_guard = self.catalog.write(); match info { diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index e8adc309d10e2..ace47af07905b 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -333,16 +333,20 @@ impl CatalogController { Ok(()) } + /// Create catalogs for internal tables. Some of the fields in the given arguments are + /// placeholders will be updated later in `prepare_streaming_job`. + /// + /// Returns a mapping from the temporary table id to the actual global table id. pub async fn create_internal_table_catalog( &self, job: &StreamingJob, - mut internal_tables: Vec, + incomplete_internal_tables: Vec, ) -> MetaResult> { let job_id = job.id() as ObjectId; let inner = self.inner.write().await; let txn = inner.db.begin().await?; let mut table_id_map = HashMap::new(); - for table in &mut internal_tables { + for table in incomplete_internal_tables { let table_id = Self::create_object( &txn, ObjectType::Table, @@ -353,33 +357,39 @@ impl CatalogController { .await? .oid; table_id_map.insert(table.id, table_id as u32); - table.id = table_id as _; - let mut table_model: table::ActiveModel = table.clone().into(); - table_model.table_id = Set(table_id as _); - table_model.belongs_to_job_id = Set(Some(job_id)); - table_model.fragment_id = NotSet; + + let table_model = table::ActiveModel { + table_id: Set(table_id as _), + belongs_to_job_id: Set(Some(job_id)), + fragment_id: NotSet, + ..table.into() + }; Table::insert(table_model).exec(&txn).await?; } txn.commit().await?; - if job.is_materialized_view() { - self.notify_frontend( - Operation::Add, - Info::RelationGroup(RelationGroup { - relations: internal_tables - .iter() - .map(|table| Relation { - relation_info: Some(RelationInfo::Table(table.clone())), - }) - .collect(), - }), - ) - .await; - } - Ok(table_id_map) } + /// Notify frontend about the given internal tables before the streaming job finishes creating. + /// Should only be called for materialized views. + pub async fn pre_notify_internal_tables(&self, internal_tables: &[PbTable]) -> MetaResult<()> { + self.notify_frontend( + Operation::Add, + Info::RelationGroup(RelationGroup { + relations: internal_tables + .iter() + .map(|table| Relation { + relation_info: Some(RelationInfo::Table(table.clone())), + }) + .collect(), + }), + ) + .await; + + Ok(()) + } + pub async fn prepare_streaming_job( &self, table_fragment: PbTableFragments, diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 7eb921e0befa4..7498442747546 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -18,8 +18,10 @@ use std::ops::AddAssign; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::{VirtualNode, WorkerSlotId}; +use risingwave_common::util::stream_graph_visitor; use risingwave_connector::source::SplitImpl; use risingwave_meta_model::{SourceId, WorkerId}; +use risingwave_pb::catalog::Table; use risingwave_pb::common::PbActorLocation; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; @@ -564,6 +566,25 @@ impl TableFragments { } } + /// Retrieve the internal tables map of the whole graph. + /// + /// See also [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`]. + pub fn internal_tables(&self) -> BTreeMap { + let mut tables = BTreeMap::new(); + for fragment in self.fragments.values() { + stream_graph_visitor::visit_stream_node_internal_tables( + &mut fragment.actors[0].nodes.clone().unwrap(), + |table, _| { + let table_id = table.id; + tables + .try_insert(table_id, table.clone()) + .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id)); + }, + ); + } + tables + } + /// Returns the internal table ids without the mview table. pub fn internal_table_ids(&self) -> Vec { self.fragments diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 995643215d317..5b2bff2c734b8 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1040,11 +1040,14 @@ impl DdlController { streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); // create internal table catalogs and refill table id. - let internal_tables = fragment_graph.internal_tables().into_values().collect_vec(); + let incomplete_internal_tables = fragment_graph + .incomplete_internal_tables() + .into_values() + .collect_vec(); let table_id_map = self .metadata_manager .catalog_controller - .create_internal_table_catalog(&streaming_job, internal_tables) + .create_internal_table_catalog(&streaming_job, incomplete_internal_tables) .await?; fragment_graph.refill_internal_table_ids(table_id_map); @@ -1079,6 +1082,16 @@ impl DdlController { .await?; let streaming_job = &ctx.streaming_job; + let internal_tables = ctx.internal_tables(); + + // Now that all fields in `internal_tables` are initialized, + // we can notify frontend for these relations. + if streaming_job.is_materialized_view() { + self.metadata_manager + .catalog_controller + .pre_notify_internal_tables(&internal_tables) + .await?; + } match streaming_job { StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => { @@ -1560,7 +1573,6 @@ impl DdlController { ) -> MetaResult<(CreateStreamingJobContext, TableFragments)> { let id = stream_job.id(); let specified_parallelism = fragment_graph.specified_parallelism(); - let internal_tables = fragment_graph.internal_tables(); let expr_context = stream_ctx.to_expr_context(); let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap(); @@ -1643,6 +1655,7 @@ impl DdlController { table_parallelism, max_parallelism.get(), ); + let internal_tables = table_fragments.internal_tables(); if let Some(mview_fragment) = table_fragments.mview_fragment() { stream_job.set_table_vnode_count(mview_fragment.vnode_count()); diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index aaa30ee0d57c5..d6bfab0e50109 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -349,6 +349,9 @@ impl StreamFragmentGraph { ) -> MetaResult { let fragment_id_gen = GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64); + // Note: in SQL backend, the ids generated here are fake and will be overwritten again + // with `refill_internal_table_ids` later. + // TODO: refactor the code to remove this step. let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64); // Create nodes. @@ -418,7 +421,13 @@ impl StreamFragmentGraph { } /// Retrieve the internal tables map of the whole graph. - pub fn internal_tables(&self) -> BTreeMap { + /// + /// Note that some fields in the table catalogs are not filled during the current phase, e.g., + /// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built. + /// Be careful when using the returned values. + /// + /// See also [`crate::model::TableFragments::internal_tables`]. + pub fn incomplete_internal_tables(&self) -> BTreeMap { let mut tables = BTreeMap::new(); for fragment in self.fragments.values() { for table in fragment.extract_internal_tables() { @@ -431,6 +440,8 @@ impl StreamFragmentGraph { tables } + /// Refill the internal tables' `table_id`s according to the given map, typically obtained from + /// `create_internal_table_catalog`. pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap) { for fragment in self.fragments.values_mut() { stream_graph_visitor::visit_internal_tables( diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 9818af124537e..719323e66ffb0 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -448,7 +448,7 @@ async fn test_graph_builder() -> MetaResult<()> { time_zone: graph.ctx.as_ref().unwrap().timezone.clone(), }; let fragment_graph = StreamFragmentGraph::new(&env, graph, &job)?; - let internal_tables = fragment_graph.internal_tables(); + let internal_tables = fragment_graph.incomplete_internal_tables(); let actor_graph_builder = ActorGraphBuilder::new( job.id(), From 2e753f38e235fd8066185fb217ed44e195fb845f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 18 Oct 2024 17:32:41 +0800 Subject: [PATCH 2/5] minor rename Signed-off-by: Bugen Zhao --- src/frontend/src/observer/observer_manager.rs | 2 +- src/meta/src/controller/catalog.rs | 6 +++--- src/meta/src/controller/streaming_job.rs | 4 ++-- src/meta/src/controller/utils.rs | 8 +++++++- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 4133f708688eb..92408c2b03885 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -245,7 +245,7 @@ impl FrontendObserverNode { let Some(info) = resp.info.as_ref() else { return; }; - tracing::trace!(?info, "handle catalog notification"); + tracing::trace!(op = ?resp.operation(), ?info, "handle catalog notification"); let mut catalog_guard = self.catalog.write(); match info { diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 08824459e916e..8c74698278f9d 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -66,7 +66,7 @@ use tracing::info; use super::utils::{check_subscription_name_duplicate, get_fragment_ids_by_jobs}; use crate::controller::rename::{alter_relation_rename, alter_relation_rename_refs}; use crate::controller::utils::{ - build_relation_group, check_connection_name_duplicate, check_database_name_duplicate, + build_relation_group_for_delete, check_connection_name_duplicate, check_database_name_duplicate, check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, check_secret_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id, extract_external_table_name_from_definition, get_referring_objects, @@ -891,7 +891,7 @@ impl CatalogController { txn.commit().await?; - let relation_group = build_relation_group( + let relation_group = build_relation_group_for_delete( dirty_mview_objs .into_iter() .chain(dirty_mview_internal_table_objs.into_iter()) @@ -2305,7 +2305,7 @@ impl CatalogController { // notify about them. self.notify_users_update(user_infos).await; - let relation_group = build_relation_group(to_drop_objects); + let relation_group = build_relation_group_for_delete(to_drop_objects); let version = self .notify_frontend(NotificationOperation::Delete, relation_group) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index ace47af07905b..85953a2af95b5 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -63,7 +63,7 @@ use crate::barrier::{ReplaceTablePlan, Reschedule}; use crate::controller::catalog::CatalogController; use crate::controller::rename::ReplaceTableExprRewriter; use crate::controller::utils::{ - build_relation_group, check_relation_name_duplicate, check_sink_into_table_cycle, + build_relation_group_for_delete, check_relation_name_duplicate, check_sink_into_table_cycle, ensure_object_id, ensure_user_id, get_fragment_actor_ids, get_fragment_mappings, rebuild_fragment_mapping_from_actors, PartialObject, }; @@ -575,7 +575,7 @@ impl CatalogController { txn.commit().await?; if !objs.is_empty() { - self.notify_frontend(Operation::Delete, build_relation_group(objs)) + self.notify_frontend(Operation::Delete, build_relation_group_for_delete(objs)) .await; } Ok(true) diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index a9650388b8db8..6cf09e754c24f 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -1082,7 +1082,13 @@ where )) } -pub(crate) fn build_relation_group(relation_objects: Vec) -> NotificationInfo { +/// Build a relation group for notifying the deletion of the given objects. +/// +/// Note that only id fields are filled in the relation info, as the arguments are partial objects. +/// As a result, the returned notification info should only be used for deletion. +pub(crate) fn build_relation_group_for_delete( + relation_objects: Vec, +) -> NotificationInfo { let mut relations = vec![]; for obj in relation_objects { match obj.obj_type { From 54b30c54c39ed10901af3aff35de6619737fc284 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 18 Oct 2024 17:48:34 +0800 Subject: [PATCH 3/5] ignore drop notification if not exists Signed-off-by: Bugen Zhao --- src/frontend/src/catalog/schema_catalog.rs | 62 +++++++++++++++------- src/meta/src/controller/streaming_job.rs | 4 ++ 2 files changed, 48 insertions(+), 18 deletions(-) diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index 0394da2a70f81..3221f40aa1c28 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -160,7 +160,11 @@ impl SchemaCatalog { } pub fn drop_table(&mut self, id: TableId) { - let table_ref = self.table_by_id.remove(&id).unwrap(); + let Some(table_ref) = self.table_by_id.remove(&id) else { + tracing::warn!(?id, "table to drop not found, cleaning up?"); + return; + }; + self.table_by_name.remove(&table_ref.name).unwrap(); self.indexes_by_table_id.remove(&table_ref.id); } @@ -190,7 +194,11 @@ impl SchemaCatalog { } pub fn drop_index(&mut self, id: IndexId) { - let index_ref = self.index_by_id.remove(&id).unwrap(); + let Some(index_ref) = self.index_by_id.remove(&id) else { + tracing::warn!(?id, "index to drop not found, cleaning up?"); + return; + }; + self.index_by_name.remove(&index_ref.name).unwrap(); match self.indexes_by_table_id.entry(index_ref.primary_table.id) { Occupied(mut entry) => { @@ -225,7 +233,11 @@ impl SchemaCatalog { } pub fn drop_source(&mut self, id: SourceId) { - let source_ref = self.source_by_id.remove(&id).unwrap(); + let Some(source_ref) = self.source_by_id.remove(&id) else { + tracing::warn!(?id, "source to drop not found, cleaning up?"); + return; + }; + self.source_by_name.remove(&source_ref.name).unwrap(); if let Some(connection_id) = source_ref.connection_id { if let Occupied(mut e) = self.connection_source_ref.entry(connection_id) { @@ -274,7 +286,11 @@ impl SchemaCatalog { } pub fn drop_sink(&mut self, id: SinkId) { - let sink_ref = self.sink_by_id.remove(&id).unwrap(); + let Some(sink_ref) = self.sink_by_id.remove(&id) else { + tracing::warn!(?id, "sink to drop not found, cleaning up?"); + return; + }; + self.sink_by_name.remove(&sink_ref.name).unwrap(); if let Some(connection_id) = sink_ref.connection_id { if let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0) { @@ -318,7 +334,11 @@ impl SchemaCatalog { } pub fn drop_subscription(&mut self, id: SubscriptionId) { - let subscription_ref = self.subscription_by_id.remove(&id).unwrap(); + let Some(subscription_ref) = self.subscription_by_id.remove(&id) else { + tracing::warn!(?id, "subscription to drop not found, cleaning up?"); + return; + }; + self.subscription_by_name .remove(&subscription_ref.name) .unwrap(); @@ -354,7 +374,11 @@ impl SchemaCatalog { } pub fn drop_view(&mut self, id: ViewId) { - let view_ref = self.view_by_id.remove(&id).unwrap(); + let Some(view_ref) = self.view_by_id.remove(&id) else { + tracing::warn!(?id, "view to drop not found, cleaning up?"); + return; + }; + self.view_by_name.remove(&view_ref.name).unwrap(); } @@ -411,10 +435,10 @@ impl SchemaCatalog { } pub fn drop_function(&mut self, id: FunctionId) { - let function_ref = self - .function_by_id - .remove(&id) - .expect("function not found by id"); + let Some(function_ref) = self.function_by_id.remove(&id) else { + tracing::warn!(?id, "function to drop not found, cleaning up?"); + return; + }; self.function_registry .remove(Self::get_func_sign(&function_ref)) @@ -483,10 +507,11 @@ impl SchemaCatalog { } pub fn drop_connection(&mut self, connection_id: ConnectionId) { - let connection_ref = self - .connection_by_id - .remove(&connection_id) - .expect("connection not found by id"); + let Some(connection_ref) = self.connection_by_id.remove(&connection_id) else { + tracing::warn!(?connection_id, "connection to drop not found, cleaning up?"); + return; + }; + self.connection_by_name .remove(&connection_ref.name) .expect("connection not found by name"); @@ -523,10 +548,11 @@ impl SchemaCatalog { } pub fn drop_secret(&mut self, secret_id: SecretId) { - let secret_ref = self - .secret_by_id - .remove(&secret_id) - .expect("secret not found by id"); + let Some(secret_ref) = self.secret_by_id.remove(&secret_id) else { + tracing::warn!(?secret_id, "secret to drop not found, cleaning up?"); + return; + }; + self.secret_by_name .remove(&secret_ref.name) .expect("secret not found by name"); diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 85953a2af95b5..5325c3a0a9367 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -575,6 +575,10 @@ impl CatalogController { txn.commit().await?; if !objs.is_empty() { + // We **may** also have notified the frontend about these objects, + // so we need to notify the frontend to delete them here. + // The frontend will ignore the request if the object does not exist, + // so it's safe to always notify. self.notify_frontend(Operation::Delete, build_relation_group_for_delete(objs)) .await; } From 1ab43fddfae4f0b6dde829ef287a75aa36317eb6 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Sat, 19 Oct 2024 00:56:30 +0800 Subject: [PATCH 4/5] format Signed-off-by: Bugen Zhao --- src/meta/src/controller/catalog.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 8c74698278f9d..5d215344ac0ec 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -66,10 +66,11 @@ use tracing::info; use super::utils::{check_subscription_name_duplicate, get_fragment_ids_by_jobs}; use crate::controller::rename::{alter_relation_rename, alter_relation_rename_refs}; use crate::controller::utils::{ - build_relation_group_for_delete, check_connection_name_duplicate, check_database_name_duplicate, - check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, - check_secret_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty, - ensure_user_id, extract_external_table_name_from_definition, get_referring_objects, + build_relation_group_for_delete, check_connection_name_duplicate, + check_database_name_duplicate, check_function_signature_duplicate, + check_relation_name_duplicate, check_schema_name_duplicate, check_secret_name_duplicate, + ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id, + extract_external_table_name_from_definition, get_referring_objects, get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids, resolve_source_register_info_for_jobs, PartialObject, }; From 5adb7fedc69e6086c28e0b706771881e2384f042 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 21 Oct 2024 14:55:46 +0800 Subject: [PATCH 5/5] revert logical changes Signed-off-by: Bugen Zhao --- src/frontend/src/catalog/schema_catalog.rs | 62 ++++++-------------- src/meta/src/controller/streaming_job.rs | 57 +++++++++--------- src/meta/src/model/stream.rs | 5 +- src/meta/src/rpc/ddl_controller.rs | 10 ---- src/meta/src/stream/stream_graph/fragment.rs | 2 +- 5 files changed, 52 insertions(+), 84 deletions(-) diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index 3221f40aa1c28..0394da2a70f81 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -160,11 +160,7 @@ impl SchemaCatalog { } pub fn drop_table(&mut self, id: TableId) { - let Some(table_ref) = self.table_by_id.remove(&id) else { - tracing::warn!(?id, "table to drop not found, cleaning up?"); - return; - }; - + let table_ref = self.table_by_id.remove(&id).unwrap(); self.table_by_name.remove(&table_ref.name).unwrap(); self.indexes_by_table_id.remove(&table_ref.id); } @@ -194,11 +190,7 @@ impl SchemaCatalog { } pub fn drop_index(&mut self, id: IndexId) { - let Some(index_ref) = self.index_by_id.remove(&id) else { - tracing::warn!(?id, "index to drop not found, cleaning up?"); - return; - }; - + let index_ref = self.index_by_id.remove(&id).unwrap(); self.index_by_name.remove(&index_ref.name).unwrap(); match self.indexes_by_table_id.entry(index_ref.primary_table.id) { Occupied(mut entry) => { @@ -233,11 +225,7 @@ impl SchemaCatalog { } pub fn drop_source(&mut self, id: SourceId) { - let Some(source_ref) = self.source_by_id.remove(&id) else { - tracing::warn!(?id, "source to drop not found, cleaning up?"); - return; - }; - + let source_ref = self.source_by_id.remove(&id).unwrap(); self.source_by_name.remove(&source_ref.name).unwrap(); if let Some(connection_id) = source_ref.connection_id { if let Occupied(mut e) = self.connection_source_ref.entry(connection_id) { @@ -286,11 +274,7 @@ impl SchemaCatalog { } pub fn drop_sink(&mut self, id: SinkId) { - let Some(sink_ref) = self.sink_by_id.remove(&id) else { - tracing::warn!(?id, "sink to drop not found, cleaning up?"); - return; - }; - + let sink_ref = self.sink_by_id.remove(&id).unwrap(); self.sink_by_name.remove(&sink_ref.name).unwrap(); if let Some(connection_id) = sink_ref.connection_id { if let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0) { @@ -334,11 +318,7 @@ impl SchemaCatalog { } pub fn drop_subscription(&mut self, id: SubscriptionId) { - let Some(subscription_ref) = self.subscription_by_id.remove(&id) else { - tracing::warn!(?id, "subscription to drop not found, cleaning up?"); - return; - }; - + let subscription_ref = self.subscription_by_id.remove(&id).unwrap(); self.subscription_by_name .remove(&subscription_ref.name) .unwrap(); @@ -374,11 +354,7 @@ impl SchemaCatalog { } pub fn drop_view(&mut self, id: ViewId) { - let Some(view_ref) = self.view_by_id.remove(&id) else { - tracing::warn!(?id, "view to drop not found, cleaning up?"); - return; - }; - + let view_ref = self.view_by_id.remove(&id).unwrap(); self.view_by_name.remove(&view_ref.name).unwrap(); } @@ -435,10 +411,10 @@ impl SchemaCatalog { } pub fn drop_function(&mut self, id: FunctionId) { - let Some(function_ref) = self.function_by_id.remove(&id) else { - tracing::warn!(?id, "function to drop not found, cleaning up?"); - return; - }; + let function_ref = self + .function_by_id + .remove(&id) + .expect("function not found by id"); self.function_registry .remove(Self::get_func_sign(&function_ref)) @@ -507,11 +483,10 @@ impl SchemaCatalog { } pub fn drop_connection(&mut self, connection_id: ConnectionId) { - let Some(connection_ref) = self.connection_by_id.remove(&connection_id) else { - tracing::warn!(?connection_id, "connection to drop not found, cleaning up?"); - return; - }; - + let connection_ref = self + .connection_by_id + .remove(&connection_id) + .expect("connection not found by id"); self.connection_by_name .remove(&connection_ref.name) .expect("connection not found by name"); @@ -548,11 +523,10 @@ impl SchemaCatalog { } pub fn drop_secret(&mut self, secret_id: SecretId) { - let Some(secret_ref) = self.secret_by_id.remove(&secret_id) else { - tracing::warn!(?secret_id, "secret to drop not found, cleaning up?"); - return; - }; - + let secret_ref = self + .secret_by_id + .remove(&secret_id) + .expect("secret not found by id"); self.secret_by_name .remove(&secret_ref.name) .expect("secret not found by name"); diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 5325c3a0a9367..516adbf8b6c90 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -99,6 +99,11 @@ impl CatalogController { Ok(obj.oid) } + /// Create catalogs for the streaming job, then notify frontend about them if the job is a + /// materialized view. + /// + /// Some of the fields in the given streaming job are placeholders, which will + /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`. pub async fn create_job_catalog( &self, streaming_job: &mut StreamingJob, @@ -333,20 +338,23 @@ impl CatalogController { Ok(()) } - /// Create catalogs for internal tables. Some of the fields in the given arguments are - /// placeholders will be updated later in `prepare_streaming_job`. + /// Create catalogs for internal tables, then notify frontend about them if the job is a + /// materialized view. + /// + /// Some of the fields in the given "incomplete" internal tables are placeholders, which will + /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`. /// /// Returns a mapping from the temporary table id to the actual global table id. pub async fn create_internal_table_catalog( &self, job: &StreamingJob, - incomplete_internal_tables: Vec, + mut incomplete_internal_tables: Vec, ) -> MetaResult> { let job_id = job.id() as ObjectId; let inner = self.inner.write().await; let txn = inner.db.begin().await?; let mut table_id_map = HashMap::new(); - for table in incomplete_internal_tables { + for table in &mut incomplete_internal_tables { let table_id = Self::create_object( &txn, ObjectType::Table, @@ -357,37 +365,34 @@ impl CatalogController { .await? .oid; table_id_map.insert(table.id, table_id as u32); + table.id = table_id as _; let table_model = table::ActiveModel { table_id: Set(table_id as _), belongs_to_job_id: Set(Some(job_id)), fragment_id: NotSet, - ..table.into() + ..table.clone().into() }; Table::insert(table_model).exec(&txn).await?; } txn.commit().await?; - Ok(table_id_map) - } - - /// Notify frontend about the given internal tables before the streaming job finishes creating. - /// Should only be called for materialized views. - pub async fn pre_notify_internal_tables(&self, internal_tables: &[PbTable]) -> MetaResult<()> { - self.notify_frontend( - Operation::Add, - Info::RelationGroup(RelationGroup { - relations: internal_tables - .iter() - .map(|table| Relation { - relation_info: Some(RelationInfo::Table(table.clone())), - }) - .collect(), - }), - ) - .await; + if job.is_materialized_view() { + self.notify_frontend( + Operation::Add, + Info::RelationGroup(RelationGroup { + relations: incomplete_internal_tables + .iter() + .map(|table| Relation { + relation_info: Some(RelationInfo::Table(table.clone())), + }) + .collect(), + }), + ) + .await; + } - Ok(()) + Ok(table_id_map) } pub async fn prepare_streaming_job( @@ -575,10 +580,8 @@ impl CatalogController { txn.commit().await?; if !objs.is_empty() { - // We **may** also have notified the frontend about these objects, + // We also have notified the frontend about these objects, // so we need to notify the frontend to delete them here. - // The frontend will ignore the request if the object does not exist, - // so it's safe to always notify. self.notify_frontend(Operation::Delete, build_relation_group_for_delete(objs)) .await; } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 7498442747546..2ad917eb8e050 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -566,9 +566,10 @@ impl TableFragments { } } - /// Retrieve the internal tables map of the whole graph. + /// Retrieve the **complete** internal tables map of the whole graph. /// - /// See also [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`]. + /// Compared to [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`], + /// the table catalogs returned here are complete, with all fields filled. pub fn internal_tables(&self) -> BTreeMap { let mut tables = BTreeMap::new(); for fragment in self.fragments.values() { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 5b2bff2c734b8..baffdbd72fba9 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1082,16 +1082,6 @@ impl DdlController { .await?; let streaming_job = &ctx.streaming_job; - let internal_tables = ctx.internal_tables(); - - // Now that all fields in `internal_tables` are initialized, - // we can notify frontend for these relations. - if streaming_job.is_materialized_view() { - self.metadata_manager - .catalog_controller - .pre_notify_internal_tables(&internal_tables) - .await?; - } match streaming_job { StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => { diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index d6bfab0e50109..211f609a13fe9 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -420,7 +420,7 @@ impl StreamFragmentGraph { }) } - /// Retrieve the internal tables map of the whole graph. + /// Retrieve the **incomplete** internal tables map of the whole graph. /// /// Note that some fields in the table catalogs are not filled during the current phase, e.g., /// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built.