diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index 3221f40aa1c28..0394da2a70f81 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -160,11 +160,7 @@ impl SchemaCatalog { } pub fn drop_table(&mut self, id: TableId) { - let Some(table_ref) = self.table_by_id.remove(&id) else { - tracing::warn!(?id, "table to drop not found, cleaning up?"); - return; - }; - + let table_ref = self.table_by_id.remove(&id).unwrap(); self.table_by_name.remove(&table_ref.name).unwrap(); self.indexes_by_table_id.remove(&table_ref.id); } @@ -194,11 +190,7 @@ impl SchemaCatalog { } pub fn drop_index(&mut self, id: IndexId) { - let Some(index_ref) = self.index_by_id.remove(&id) else { - tracing::warn!(?id, "index to drop not found, cleaning up?"); - return; - }; - + let index_ref = self.index_by_id.remove(&id).unwrap(); self.index_by_name.remove(&index_ref.name).unwrap(); match self.indexes_by_table_id.entry(index_ref.primary_table.id) { Occupied(mut entry) => { @@ -233,11 +225,7 @@ impl SchemaCatalog { } pub fn drop_source(&mut self, id: SourceId) { - let Some(source_ref) = self.source_by_id.remove(&id) else { - tracing::warn!(?id, "source to drop not found, cleaning up?"); - return; - }; - + let source_ref = self.source_by_id.remove(&id).unwrap(); self.source_by_name.remove(&source_ref.name).unwrap(); if let Some(connection_id) = source_ref.connection_id { if let Occupied(mut e) = self.connection_source_ref.entry(connection_id) { @@ -286,11 +274,7 @@ impl SchemaCatalog { } pub fn drop_sink(&mut self, id: SinkId) { - let Some(sink_ref) = self.sink_by_id.remove(&id) else { - tracing::warn!(?id, "sink to drop not found, cleaning up?"); - return; - }; - + let sink_ref = self.sink_by_id.remove(&id).unwrap(); self.sink_by_name.remove(&sink_ref.name).unwrap(); if let Some(connection_id) = sink_ref.connection_id { if let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0) { @@ -334,11 +318,7 @@ impl SchemaCatalog { } pub fn drop_subscription(&mut self, id: SubscriptionId) { - let Some(subscription_ref) = self.subscription_by_id.remove(&id) else { - tracing::warn!(?id, "subscription to drop not found, cleaning up?"); - return; - }; - + let subscription_ref = self.subscription_by_id.remove(&id).unwrap(); self.subscription_by_name .remove(&subscription_ref.name) .unwrap(); @@ -374,11 +354,7 @@ impl SchemaCatalog { } pub fn drop_view(&mut self, id: ViewId) { - let Some(view_ref) = self.view_by_id.remove(&id) else { - tracing::warn!(?id, "view to drop not found, cleaning up?"); - return; - }; - + let view_ref = self.view_by_id.remove(&id).unwrap(); self.view_by_name.remove(&view_ref.name).unwrap(); } @@ -435,10 +411,10 @@ impl SchemaCatalog { } pub fn drop_function(&mut self, id: FunctionId) { - let Some(function_ref) = self.function_by_id.remove(&id) else { - tracing::warn!(?id, "function to drop not found, cleaning up?"); - return; - }; + let function_ref = self + .function_by_id + .remove(&id) + .expect("function not found by id"); self.function_registry .remove(Self::get_func_sign(&function_ref)) @@ -507,11 +483,10 @@ impl SchemaCatalog { } pub fn drop_connection(&mut self, connection_id: ConnectionId) { - let Some(connection_ref) = self.connection_by_id.remove(&connection_id) else { - tracing::warn!(?connection_id, "connection to drop not found, cleaning up?"); - return; - }; - + let connection_ref = self + .connection_by_id + .remove(&connection_id) + .expect("connection not found by id"); self.connection_by_name .remove(&connection_ref.name) .expect("connection not found by name"); @@ -548,11 +523,10 @@ impl SchemaCatalog { } pub fn drop_secret(&mut self, secret_id: SecretId) { - let Some(secret_ref) = self.secret_by_id.remove(&secret_id) else { - tracing::warn!(?secret_id, "secret to drop not found, cleaning up?"); - return; - }; - + let secret_ref = self + .secret_by_id + .remove(&secret_id) + .expect("secret not found by id"); self.secret_by_name .remove(&secret_ref.name) .expect("secret not found by name"); diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 5325c3a0a9367..516adbf8b6c90 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -99,6 +99,11 @@ impl CatalogController { Ok(obj.oid) } + /// Create catalogs for the streaming job, then notify frontend about them if the job is a + /// materialized view. + /// + /// Some of the fields in the given streaming job are placeholders, which will + /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`. pub async fn create_job_catalog( &self, streaming_job: &mut StreamingJob, @@ -333,20 +338,23 @@ impl CatalogController { Ok(()) } - /// Create catalogs for internal tables. Some of the fields in the given arguments are - /// placeholders will be updated later in `prepare_streaming_job`. + /// Create catalogs for internal tables, then notify frontend about them if the job is a + /// materialized view. + /// + /// Some of the fields in the given "incomplete" internal tables are placeholders, which will + /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`. /// /// Returns a mapping from the temporary table id to the actual global table id. pub async fn create_internal_table_catalog( &self, job: &StreamingJob, - incomplete_internal_tables: Vec, + mut incomplete_internal_tables: Vec, ) -> MetaResult> { let job_id = job.id() as ObjectId; let inner = self.inner.write().await; let txn = inner.db.begin().await?; let mut table_id_map = HashMap::new(); - for table in incomplete_internal_tables { + for table in &mut incomplete_internal_tables { let table_id = Self::create_object( &txn, ObjectType::Table, @@ -357,37 +365,34 @@ impl CatalogController { .await? .oid; table_id_map.insert(table.id, table_id as u32); + table.id = table_id as _; let table_model = table::ActiveModel { table_id: Set(table_id as _), belongs_to_job_id: Set(Some(job_id)), fragment_id: NotSet, - ..table.into() + ..table.clone().into() }; Table::insert(table_model).exec(&txn).await?; } txn.commit().await?; - Ok(table_id_map) - } - - /// Notify frontend about the given internal tables before the streaming job finishes creating. - /// Should only be called for materialized views. - pub async fn pre_notify_internal_tables(&self, internal_tables: &[PbTable]) -> MetaResult<()> { - self.notify_frontend( - Operation::Add, - Info::RelationGroup(RelationGroup { - relations: internal_tables - .iter() - .map(|table| Relation { - relation_info: Some(RelationInfo::Table(table.clone())), - }) - .collect(), - }), - ) - .await; + if job.is_materialized_view() { + self.notify_frontend( + Operation::Add, + Info::RelationGroup(RelationGroup { + relations: incomplete_internal_tables + .iter() + .map(|table| Relation { + relation_info: Some(RelationInfo::Table(table.clone())), + }) + .collect(), + }), + ) + .await; + } - Ok(()) + Ok(table_id_map) } pub async fn prepare_streaming_job( @@ -575,10 +580,8 @@ impl CatalogController { txn.commit().await?; if !objs.is_empty() { - // We **may** also have notified the frontend about these objects, + // We also have notified the frontend about these objects, // so we need to notify the frontend to delete them here. - // The frontend will ignore the request if the object does not exist, - // so it's safe to always notify. self.notify_frontend(Operation::Delete, build_relation_group_for_delete(objs)) .await; } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 7498442747546..2ad917eb8e050 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -566,9 +566,10 @@ impl TableFragments { } } - /// Retrieve the internal tables map of the whole graph. + /// Retrieve the **complete** internal tables map of the whole graph. /// - /// See also [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`]. + /// Compared to [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`], + /// the table catalogs returned here are complete, with all fields filled. pub fn internal_tables(&self) -> BTreeMap { let mut tables = BTreeMap::new(); for fragment in self.fragments.values() { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 5b2bff2c734b8..baffdbd72fba9 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1082,16 +1082,6 @@ impl DdlController { .await?; let streaming_job = &ctx.streaming_job; - let internal_tables = ctx.internal_tables(); - - // Now that all fields in `internal_tables` are initialized, - // we can notify frontend for these relations. - if streaming_job.is_materialized_view() { - self.metadata_manager - .catalog_controller - .pre_notify_internal_tables(&internal_tables) - .await?; - } match streaming_job { StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => { diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index d6bfab0e50109..211f609a13fe9 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -420,7 +420,7 @@ impl StreamFragmentGraph { }) } - /// Retrieve the internal tables map of the whole graph. + /// Retrieve the **incomplete** internal tables map of the whole graph. /// /// Note that some fields in the table catalogs are not filled during the current phase, e.g., /// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built.