diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 7d742f5aa64fc..6cb33f6fa5191 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -336,13 +336,13 @@ impl CatalogController { pub async fn create_internal_table_catalog( &self, job: &StreamingJob, - mut internal_tables: Vec, + incomplete_internal_tables: Vec, ) -> MetaResult> { let job_id = job.id() as ObjectId; let inner = self.inner.write().await; let txn = inner.db.begin().await?; let mut table_id_map = HashMap::new(); - for table in &mut internal_tables { + for table in incomplete_internal_tables { let table_id = Self::create_object( &txn, ObjectType::Table, @@ -353,33 +353,46 @@ impl CatalogController { .await? .oid; table_id_map.insert(table.id, table_id as u32); - table.id = table_id as _; - let mut table_model: table::ActiveModel = table.clone().into(); - table_model.table_id = Set(table_id as _); - table_model.belongs_to_job_id = Set(Some(job_id)); - table_model.fragment_id = NotSet; + + let table_model = table::ActiveModel { + table_id: Set(table_id as _), + belongs_to_job_id: Set(Some(job_id)), + fragment_id: NotSet, + ..table.into() + }; Table::insert(table_model).exec(&txn).await?; } txn.commit().await?; - if job.is_materialized_view() { - self.notify_frontend( - Operation::Add, - Info::RelationGroup(RelationGroup { - relations: internal_tables - .iter() - .map(|table| Relation { - relation_info: Some(RelationInfo::Table(table.clone())), - }) - .collect(), - }), - ) - .await; - } - Ok(table_id_map) } + pub async fn pre_notify_relations_for_mv( + &self, + job: &StreamingJob, + internal_tables: &[PbTable], + ) -> MetaResult<()> { + let StreamingJob::MaterializedView(table) = job else { + return Ok(()); + }; + + let tables = std::iter::once(table).chain(internal_tables); + + self.notify_frontend( + Operation::Add, + Info::RelationGroup(RelationGroup { + relations: tables + .map(|table| Relation { + relation_info: Some(RelationInfo::Table(table.clone())), + }) + .collect(), + }), + ) + .await; + + Ok(()) + } + pub async fn prepare_streaming_job( &self, table_fragment: PbTableFragments, diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index d773d2a5de935..8a6392e98f03c 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -18,8 +18,10 @@ use std::ops::AddAssign; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::{VirtualNode, WorkerSlotId}; +use risingwave_common::util::stream_graph_visitor; use risingwave_connector::source::SplitImpl; use risingwave_meta_model_v2::{SourceId, WorkerId}; +use risingwave_pb::catalog::Table; use risingwave_pb::common::PbActorLocation; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; @@ -564,6 +566,23 @@ impl TableFragments { } } + /// Retrieve the internal tables map of the whole graph. + pub fn internal_tables(&self) -> BTreeMap { + let mut tables = BTreeMap::new(); + for fragment in self.fragments.values() { + stream_graph_visitor::visit_stream_node_internal_tables( + &mut fragment.actors[0].nodes.clone().unwrap(), + |table, _| { + let table_id = table.id; + tables + .try_insert(table_id, table.clone()) + .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id)); + }, + ); + } + tables + } + /// Returns the internal table ids without the mview table. pub fn internal_table_ids(&self) -> Vec { self.fragments diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 0cc4e82969c22..72b20e7e8f903 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1013,7 +1013,6 @@ impl DdlController { ) -> MetaResult<(CreateStreamingJobContext, TableFragments)> { let id = stream_job.id(); let specified_parallelism = fragment_graph.specified_parallelism(); - let internal_tables = fragment_graph.internal_tables(); let expr_context = stream_ctx.to_expr_context(); let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap(); @@ -1096,6 +1095,7 @@ impl DdlController { table_parallelism, max_parallelism.get(), ); + let internal_tables = table_fragments.internal_tables(); if let Some(mview_fragment) = table_fragments.mview_fragment() { stream_job.set_table_vnode_count(mview_fragment.vnode_count()); diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index c2cd22d8dc9dc..bec119815b63f 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -146,11 +146,14 @@ impl DdlController { streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); // create internal table catalogs and refill table id. - let internal_tables = fragment_graph.internal_tables().into_values().collect_vec(); + let incomplete_internal_tables = fragment_graph + .incomplete_internal_tables() + .into_values() + .collect_vec(); let table_id_map = self .metadata_manager .catalog_controller - .create_internal_table_catalog(&streaming_job, internal_tables) + .create_internal_table_catalog(&streaming_job, incomplete_internal_tables) .await?; fragment_graph.refill_internal_table_ids(table_id_map); @@ -185,6 +188,14 @@ impl DdlController { .await?; let streaming_job = &ctx.streaming_job; + let internal_tables = ctx.internal_tables(); + + // Now that all fields in `streaming_job` and `internal_tables` are initialized, + // we can notify frontend for these relations. + self.metadata_manager + .catalog_controller + .pre_notify_relations_for_mv(streaming_job, &internal_tables) + .await?; match streaming_job { StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => { diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 6b78d67448d74..470af834a22b3 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -418,7 +418,7 @@ impl StreamFragmentGraph { } /// Retrieve the internal tables map of the whole graph. - pub fn internal_tables(&self) -> BTreeMap { + pub fn incomplete_internal_tables(&self) -> BTreeMap { let mut tables = BTreeMap::new(); for fragment in self.fragments.values() { for table in fragment.extract_internal_tables() { diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 9818af124537e..719323e66ffb0 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -448,7 +448,7 @@ async fn test_graph_builder() -> MetaResult<()> { time_zone: graph.ctx.as_ref().unwrap().timezone.clone(), }; let fragment_graph = StreamFragmentGraph::new(&env, graph, &job)?; - let internal_tables = fragment_graph.internal_tables(); + let internal_tables = fragment_graph.incomplete_internal_tables(); let actor_graph_builder = ActorGraphBuilder::new( job.id(),