diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 87333535209cd..14e275ad34f02 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -19,14 +19,12 @@ use std::sync::Arc; use std::time::Duration; use itertools::Itertools; -use risingwave_common::bail; -use risingwave_common::catalog; use risingwave_common::config::DefaultParallelism; use risingwave_common::hash::{ParallelUnitMapping, VirtualNode}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::epoch::Epoch; -use risingwave_pb::catalog; use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont}; +use risingwave_common::{bail, catalog}; use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider; use risingwave_pb::catalog::{ connection, Comment, Connection, CreateType, Database, Function, Schema, Sink, Source, Table, @@ -573,11 +571,7 @@ impl DdlController { } } - async fn check_cycle_for_sink( - &self, - sink: &catalog::Sink, - table_id: TableId, - ) -> MetaResult { + async fn check_cycle_for_sink(&self, sink: &Sink, table_id: TableId) -> MetaResult { let reader = self.catalog_manager.get_catalog_core_guard().await; let mut q: VecDeque = VecDeque::new(); @@ -625,7 +619,7 @@ impl DdlController { // Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream. // The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function. // Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here. - async fn inject_replace_table_job( + async fn inject_replace_table_job_for_table_sink( &self, env: StreamEnvironment, sink: Option<&Sink>, @@ -686,6 +680,7 @@ impl DdlController { let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap(); Self::inject_replace_table_plan_for_sink( + sink.map(|sink| sink.id), &sink_fragment, table, &mut replace_table_ctx, @@ -719,6 +714,7 @@ impl DdlController { let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); Self::inject_replace_table_plan_for_sink( + Some(*sink_id), &sink_fragment, table, &mut replace_table_ctx, @@ -728,6 +724,19 @@ impl DdlController { } } + // check if the union fragment is fully assigned. + for fragment in table_fragments.fragments.values_mut() { + for actor in &mut fragment.actors { + if let Some(node) = &mut actor.nodes { + visit_stream_node(node, |node| { + if let NodeBody::Merge(merge_node) = node { + assert!(!merge_node.upstream_actor_id.is_empty(), "All the mergers for the union should have been fully assigned beforehand."); + } + }); + } + } + } + Ok(ReplaceTableJob { streaming_job, context: Some(replace_table_ctx), @@ -737,6 +746,7 @@ impl DdlController { } fn inject_replace_table_plan_for_sink( + sink_id: Option, sink_fragment: &PbFragment, table: &Table, replace_table_ctx: &mut ReplaceTableContext, @@ -808,6 +818,10 @@ impl DdlController { if let Some(NodeBody::Union(_)) = &mut node.node_body { for input in &mut node.input { if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body && merge_node.upstream_actor_id.is_empty() { + if let Some(sink_id) = sink_id { + input.identity = format!("MergeExecutor(from sink {})", sink_id); + } + *merge_node = MergeNode { upstream_actor_id: sink_actor_ids.clone(), upstream_fragment_id, @@ -959,7 +973,13 @@ impl DdlController { table_fragments, col_index_mapping, } = self - .inject_replace_table_job(env, None, None, Some(sink_id), replace_table_info) + .inject_replace_table_job_for_table_sink( + env, + None, + None, + Some(sink_id), + replace_table_info, + ) .await?; self.stream_manager @@ -1147,9 +1167,9 @@ impl DdlController { }; Some( - self.inject_replace_table_job( + self.inject_replace_table_job_for_table_sink( env, - sink, + Some(sink), Some(&table_fragments), None, replace_table_info,