Skip to content

Commit

Permalink
add more comments
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Oct 17, 2024
1 parent eb9552d commit 0c7b344
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl FrontendObserverNode {
let Some(info) = resp.info.as_ref() else {
return;
};
tracing::trace!(?info, "handle catalog notification");

let mut catalog_guard = self.catalog.write();
match info {
Expand Down
6 changes: 6 additions & 0 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ impl CatalogController {
Ok(())
}

/// Create catalogs for internal tables. Some of the fields in the given arguments are
/// placeholders will be updated later in `prepare_streaming_job`.
///
/// Returns a mapping from the temporary table id to the actual global table id.
pub async fn create_internal_table_catalog(
&self,
job: &StreamingJob,
Expand Down Expand Up @@ -367,6 +371,8 @@ impl CatalogController {
Ok(table_id_map)
}

/// Notify frontend about the given internal tables before the streaming job finishes creating.
/// Should only be called for materialized views.
pub async fn pre_notify_internal_tables(&self, internal_tables: &[PbTable]) -> MetaResult<()> {
self.notify_frontend(
Operation::Add,
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,8 @@ impl TableFragments {
}

/// Retrieve the internal tables map of the whole graph.
///
/// See also [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`].
pub fn internal_tables(&self) -> BTreeMap<u32, Table> {
let mut tables = BTreeMap::new();
for fragment in self.fragments.values() {
Expand Down
11 changes: 11 additions & 0 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ impl StreamFragmentGraph {
) -> MetaResult<Self> {
let fragment_id_gen =
GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
// Note: in SQL backend, the ids generated here are fake and will be overwritten again
// with `refill_internal_table_ids` later.
// TODO: refactor the code to remove this step.
let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);

// Create nodes.
Expand Down Expand Up @@ -418,6 +421,12 @@ impl StreamFragmentGraph {
}

/// Retrieve the internal tables map of the whole graph.
///
/// Note that some fields in the table catalogs are not filled during the current phase, e.g.,
/// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built.
/// Be careful when using the returned values.
///
/// See also [`crate::model::TableFragments::internal_tables`].
pub fn incomplete_internal_tables(&self) -> BTreeMap<u32, Table> {
let mut tables = BTreeMap::new();
for fragment in self.fragments.values() {
Expand All @@ -431,6 +440,8 @@ impl StreamFragmentGraph {
tables
}

/// Refill the internal tables' `table_id`s according to the given map, typically obtained from
/// `create_internal_table_catalog`.
pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<u32, u32>) {
for fragment in self.fragments.values_mut() {
stream_graph_visitor::visit_internal_tables(
Expand Down

0 comments on commit 0c7b344

Please sign in to comment.