diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 72d0086ba2fac..ea53956e1b735 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1285,10 +1285,15 @@ impl DdlController { }) .collect(); + let uses_arrangement_backfill = match stream_job { + StreamingJob::MaterializedView(_) => fragment_graph.has_arrangement_backfill(), + _ => false, + }; let complete_graph = CompleteStreamFragmentGraph::with_upstreams( fragment_graph, upstream_root_fragments, stream_job.table_job_type(), + uses_arrangement_backfill, )?; // 2. Build the actor graph. @@ -1704,10 +1709,15 @@ impl DdlController { ) })?; + let uses_arrangement_backfill = match stream_job { + StreamingJob::MaterializedView(_) => fragment_graph.has_arrangement_backfill(), + _ => false, + }; let complete_graph = CompleteStreamFragmentGraph::with_downstreams( fragment_graph, original_table_fragment.fragment_id, downstream_fragments, + uses_arrangement_backfill, )?; // 2. Build the actor graph. diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index b19d7a57826b7..821a40173b379 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -35,7 +35,7 @@ use risingwave_pb::stream_plan::stream_fragment_graph::{ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ DispatchStrategy, DispatcherType, FragmentTypeFlag, StreamActor, - StreamFragmentGraph as StreamFragmentGraphProto, + StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanType, }; use crate::manager::{MetaSrvEnv, StreamingJob}; @@ -468,6 +468,31 @@ impl StreamFragmentGraph { ) -> &HashMap { self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP) } + + pub fn has_arrangement_backfill(&self) -> bool { + fn has_arrangement_backfill_node(stream_node: &StreamNode) -> bool { + let is_backfill = if let Some(node) = &stream_node.node_body + && let Some(node) = node.as_stream_scan() + { + node.stream_scan_type == StreamScanType::ArrangementBackfill as i32 + } else { + false + }; + is_backfill + || stream_node + .get_input() + .iter() + .any(has_arrangement_backfill_node) + } + for (_id, fragment) in &self.fragments { + let fragment = &**fragment; + let node = fragment.node.as_ref().unwrap(); + if has_arrangement_backfill_node(node) { + return true; + } + } + false + } } static EMPTY_HASHMAP: LazyLock> = @@ -536,6 +561,7 @@ impl CompleteStreamFragmentGraph { graph: StreamFragmentGraph, upstream_root_fragments: HashMap, table_job_type: Option, + uses_arrangement_backfill: bool, ) -> MetaResult { Self::build_helper( graph, @@ -544,6 +570,7 @@ impl CompleteStreamFragmentGraph { }), None, table_job_type, + uses_arrangement_backfill, ) } @@ -553,6 +580,7 @@ impl CompleteStreamFragmentGraph { graph: StreamFragmentGraph, original_table_fragment_id: FragmentId, downstream_fragments: Vec<(DispatchStrategy, Fragment)>, + uses_arrangement_backfill: bool, ) -> MetaResult { Self::build_helper( graph, @@ -562,6 +590,7 @@ impl CompleteStreamFragmentGraph { downstream_fragments, }), None, + uses_arrangement_backfill, ) } @@ -570,6 +599,7 @@ impl CompleteStreamFragmentGraph { upstream_ctx: Option, downstream_ctx: Option, table_job_type: Option, + uses_arrangement_backfill: bool, ) -> MetaResult { let mut extra_downstreams = HashMap::new(); let mut extra_upstreams = HashMap::new(); @@ -651,19 +681,26 @@ impl CompleteStreamFragmentGraph { .collect::>>() .context("column not found in the upstream materialized view")? }; + let dispatch_strategy = if uses_arrangement_backfill { + DispatchStrategy { + r#type: DispatcherType::Hash as _, + dist_key_indices: vec![], // not used for `Exchange` + output_indices, + } + } else { + DispatchStrategy { + r#type: DispatcherType::NoShuffle as _, + dist_key_indices: vec![], // not used for `NoShuffle` + output_indices, + } + }; let edge = StreamFragmentEdge { id: EdgeId::UpstreamExternal { upstream_table_id, downstream_fragment_id: id, }, - // We always use `NoShuffle` for the exchange between the upstream - // `Materialize` and the downstream `StreamScan` of the - // new materialized view. - dispatch_strategy: DispatchStrategy { - r#type: DispatcherType::NoShuffle as _, - dist_key_indices: vec![], // not used for `NoShuffle` - output_indices, - }, + dispatch_strategy, + }; (mview_id, edge)