From 0c7b344f58336608454c301a2ed924853d1272a8 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 17 Oct 2024 15:47:35 +0800 Subject: [PATCH] add more comments Signed-off-by: Bugen Zhao --- src/frontend/src/observer/observer_manager.rs | 1 + src/meta/src/controller/streaming_job.rs | 6 ++++++ src/meta/src/model/stream.rs | 2 ++ src/meta/src/stream/stream_graph/fragment.rs | 11 +++++++++++ 4 files changed, 20 insertions(+) diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index b1ecf4182d1d0..4133f708688eb 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -245,6 +245,7 @@ impl FrontendObserverNode { let Some(info) = resp.info.as_ref() else { return; }; + tracing::trace!(?info, "handle catalog notification"); let mut catalog_guard = self.catalog.write(); match info { diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index d2c15aaca25bc..a189490a4b25f 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -333,6 +333,10 @@ impl CatalogController { Ok(()) } + /// Create catalogs for internal tables. Some of the fields in the given arguments are + /// placeholders will be updated later in `prepare_streaming_job`. + /// + /// Returns a mapping from the temporary table id to the actual global table id. pub async fn create_internal_table_catalog( &self, job: &StreamingJob, @@ -367,6 +371,8 @@ impl CatalogController { Ok(table_id_map) } + /// Notify frontend about the given internal tables before the streaming job finishes creating. + /// Should only be called for materialized views. pub async fn pre_notify_internal_tables(&self, internal_tables: &[PbTable]) -> MetaResult<()> { self.notify_frontend( Operation::Add, diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 8a6392e98f03c..b36fb5959cda5 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -567,6 +567,8 @@ impl TableFragments { } /// Retrieve the internal tables map of the whole graph. + /// + /// See also [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`]. pub fn internal_tables(&self) -> BTreeMap { let mut tables = BTreeMap::new(); for fragment in self.fragments.values() { diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 470af834a22b3..d4f52598df4e6 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -349,6 +349,9 @@ impl StreamFragmentGraph { ) -> MetaResult { let fragment_id_gen = GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64); + // Note: in SQL backend, the ids generated here are fake and will be overwritten again + // with `refill_internal_table_ids` later. + // TODO: refactor the code to remove this step. let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64); // Create nodes. @@ -418,6 +421,12 @@ impl StreamFragmentGraph { } /// Retrieve the internal tables map of the whole graph. + /// + /// Note that some fields in the table catalogs are not filled during the current phase, e.g., + /// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built. + /// Be careful when using the returned values. + /// + /// See also [`crate::model::TableFragments::internal_tables`]. pub fn incomplete_internal_tables(&self) -> BTreeMap { let mut tables = BTreeMap::new(); for fragment in self.fragments.values() { @@ -431,6 +440,8 @@ impl StreamFragmentGraph { tables } + /// Refill the internal tables' `table_id`s according to the given map, typically obtained from + /// `create_internal_table_catalog`. pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap) { for fragment in self.fragments.values_mut() { stream_graph_visitor::visit_internal_tables(