From 32df9cb2b90cc68c0bfaaf888d7a2028522ae4eb Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 17 Apr 2024 10:23:27 +0800 Subject: [PATCH] feat(frontend): fearless recursion on deep plans (#16279) Signed-off-by: Bugen Zhao --- Cargo.lock | 14 + src/common/Cargo.toml | 1 + src/common/src/util/mod.rs | 1 + src/common/src/util/recursive.rs | 190 +++++++++++ src/frontend/src/optimizer/plan_node/mod.rs | 112 ++++--- src/frontend/src/stream_fragmenter/mod.rs | 346 ++++++++++---------- 6 files changed, 448 insertions(+), 216 deletions(-) create mode 100644 src/common/src/util/recursive.rs diff --git a/Cargo.lock b/Cargo.lock index aeee200fe918b..d864187752208 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9325,6 +9325,7 @@ dependencies = [ "serde_with", "smallbitset", "speedate", + "stacker", "static_assertions", "strum 0.26.1", "strum_macros 0.26.1", @@ -12342,6 +12343,19 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "stacker" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c886bd4480155fd3ef527d45e9ac8dd7118a898a46530b7b94c3e21866259fce" +dependencies = [ + "cc", + "cfg-if", + "libc", + "psm", + "winapi", +] + [[package]] name = "static_assertions" version = "1.1.0" diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 3108e06e789f2..d21e276089c2c 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -93,6 +93,7 @@ serde_json = "1" serde_with = "3" smallbitset = "0.7.1" speedate = "0.14.0" +stacker = "0.1" static_assertions = "1" strum = "0.26" strum_macros = "0.26" diff --git a/src/common/src/util/mod.rs b/src/common/src/util/mod.rs index bb64f5a58c802..c8027ad46e381 100644 --- a/src/common/src/util/mod.rs +++ b/src/common/src/util/mod.rs @@ -30,6 +30,7 @@ pub mod pretty_bytes; pub mod prost; pub mod query_log; pub use rw_resource_util as resource_util; +pub mod recursive; pub mod row_id; pub mod row_serde; pub mod runtime; diff --git a/src/common/src/util/recursive.rs b/src/common/src/util/recursive.rs new file mode 100644 index 0000000000000..2869b3c496335 --- /dev/null +++ b/src/common/src/util/recursive.rs @@ -0,0 +1,190 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Track the recursion and grow the stack when necessary to enable fearless recursion. + +use std::cell::RefCell; + +// See documentation of `stacker` for the meaning of these constants. +// TODO: determine good values or make them configurable +const RED_ZONE: usize = 128 * 1024; // 128KiB +const STACK_SIZE: usize = 16 * RED_ZONE; // 2MiB + +/// Recursion depth. +struct Depth { + /// The current depth. + current: usize, + /// The max depth reached so far, not considering the current depth. + last_max: usize, +} + +impl Depth { + const fn new() -> Self { + Self { + current: 0, + last_max: 0, + } + } + + fn reset(&mut self) { + *self = Self::new(); + } +} + +/// The tracker for a recursive function. +pub struct Tracker { + depth: RefCell, +} + +impl Tracker { + /// Create a new tracker. + pub const fn new() -> Self { + Self { + depth: RefCell::new(Depth::new()), + } + } + + /// Retrieve the current depth of the recursion. Starts from 1 once the + /// recursive function is called. + pub fn depth(&self) -> usize { + self.depth.borrow().current + } + + /// Check if the current depth reaches the given depth **for the first time**. + /// + /// This is useful for logging without any duplication. + pub fn depth_reaches(&self, depth: usize) -> bool { + let d = self.depth.borrow(); + d.current == depth && d.current > d.last_max + } + + /// Run a recursive function. Grow the stack if necessary. + fn recurse(&self, f: impl FnOnce() -> T) -> T { + struct DepthGuard<'a> { + depth: &'a RefCell, + } + + impl<'a> DepthGuard<'a> { + fn new(depth: &'a RefCell) -> Self { + depth.borrow_mut().current += 1; + Self { depth } + } + } + + impl<'a> Drop for DepthGuard<'a> { + fn drop(&mut self) { + let mut d = self.depth.borrow_mut(); + d.last_max = d.last_max.max(d.current); // update the last max depth + d.current -= 1; // restore the current depth + if d.current == 0 { + d.reset(); // reset state if the recursion is finished + } + } + } + + let _guard = DepthGuard::new(&self.depth); + + if cfg!(madsim) { + f() // madsim does not support stack growth + } else { + stacker::maybe_grow(RED_ZONE, STACK_SIZE, f) + } + } +} + +/// The extension trait for a thread-local tracker to run a recursive function. +#[easy_ext::ext(Recurse)] +impl std::thread::LocalKey { + /// Run the given recursive function. Grow the stack if necessary. + /// + /// # Fearless Recursion + /// + /// This enables fearless recursion in most cases as long as a single frame + /// does not exceed the [`RED_ZONE`] size. That is, the caller can recurse + /// as much as it wants without worrying about stack overflow. + /// + /// # Tracker + /// + /// The caller can retrieve the [`Tracker`] of the current recursion from + /// the closure argument. This can be useful for checking the depth of the + /// recursion, logging or throwing an error gracefully if it's too deep. + /// + /// Note that different trackers defined in different functions are + /// independent of each other. If there's a cross-function recursion, the + /// tracker retrieved from the closure argument only represents the current + /// function's state. + /// + /// # Example + /// + /// Define the tracker with [`tracker!`] and call this method on it to run + /// a recursive function. + /// + /// ```ignore + /// #[inline(never)] + /// fn sum(x: u64) -> u64 { + /// tracker!().recurse(|t| { + /// if t.depth() % 100000 == 0 { + /// eprintln!("too deep!"); + /// } + /// if x == 0 { + /// return 0; + /// } + /// x + sum(x - 1) + /// }) + /// } + /// ``` + pub fn recurse(&'static self, f: impl FnOnce(&Tracker) -> T) -> T { + self.with(|t| t.recurse(|| f(t))) + } +} + +/// Define the tracker for recursion and return it. +/// +/// Call [`Recurse::recurse`] on it to run a recursive function. See +/// documentation there for usage. +#[macro_export] +macro_rules! __recursive_tracker { + () => {{ + use $crate::util::recursive::Tracker; + std::thread_local! { + static __TRACKER: Tracker = const { Tracker::new() }; + } + __TRACKER + }}; +} +pub use __recursive_tracker as tracker; + +#[cfg(all(test, not(madsim)))] +mod tests { + use super::*; + + #[test] + fn test_fearless_recursion() { + const X: u64 = 1919810; + const EXPECTED: u64 = 1842836177955; + + #[inline(never)] + fn sum(x: u64) -> u64 { + tracker!().recurse(|t| { + if x == 0 { + assert_eq!(t.depth(), X as usize + 1); + return 0; + } + x + sum(x - 1) + }) + } + + assert_eq!(sum(X), EXPECTED); + } +} diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 780e1d2b39cdd..ff749781f9265 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -39,6 +39,7 @@ use itertools::Itertools; use paste::paste; use pretty_xmlish::{Pretty, PrettyConfig}; use risingwave_common::catalog::Schema; +use risingwave_common::util::recursive::{self, Recurse}; use risingwave_pb::batch_plan::PlanNode as BatchPlanPb; use risingwave_pb::stream_plan::StreamNode as StreamPlanPb; use serde::Serialize; @@ -51,6 +52,7 @@ use self::utils::Distill; use super::property::{Distribution, FunctionalDependencySet, Order}; use crate::error::{ErrorCode, Result}; use crate::optimizer::ExpressionSimplifyRewriter; +use crate::session::current::notice_to_user; /// A marker trait for different conventions, used for enforcing type safety. /// @@ -694,6 +696,10 @@ impl dyn PlanNode { } } +const PLAN_DEPTH_THRESHOLD: usize = 30; +const PLAN_TOO_DEEP_NOTICE: &str = "The plan is too deep. \ +Consider simplifying or splitting the query if you encounter any issues."; + impl dyn PlanNode { /// Serialize the plan node and its children to a stream plan proto. /// @@ -703,41 +709,47 @@ impl dyn PlanNode { &self, state: &mut BuildFragmentGraphState, ) -> SchedulerResult { - use stream::prelude::*; + recursive::tracker!().recurse(|t| { + if t.depth_reaches(PLAN_DEPTH_THRESHOLD) { + notice_to_user(PLAN_TOO_DEEP_NOTICE); + } - if let Some(stream_table_scan) = self.as_stream_table_scan() { - return stream_table_scan.adhoc_to_stream_prost(state); - } - if let Some(stream_cdc_table_scan) = self.as_stream_cdc_table_scan() { - return stream_cdc_table_scan.adhoc_to_stream_prost(state); - } - if let Some(stream_source_scan) = self.as_stream_source_scan() { - return stream_source_scan.adhoc_to_stream_prost(state); - } - if let Some(stream_share) = self.as_stream_share() { - return stream_share.adhoc_to_stream_prost(state); - } + use stream::prelude::*; - let node = Some(self.try_to_stream_prost_body(state)?); - let input = self - .inputs() - .into_iter() - .map(|plan| plan.to_stream_prost(state)) - .try_collect()?; - // TODO: support pk_indices and operator_id - Ok(StreamPlanPb { - input, - identity: self.explain_myself_to_string(), - node_body: node, - operator_id: self.id().0 as _, - stream_key: self - .stream_key() - .unwrap_or_default() - .iter() - .map(|x| *x as u32) - .collect(), - fields: self.schema().to_prost(), - append_only: self.plan_base().append_only(), + if let Some(stream_table_scan) = self.as_stream_table_scan() { + return stream_table_scan.adhoc_to_stream_prost(state); + } + if let Some(stream_cdc_table_scan) = self.as_stream_cdc_table_scan() { + return stream_cdc_table_scan.adhoc_to_stream_prost(state); + } + if let Some(stream_source_scan) = self.as_stream_source_scan() { + return stream_source_scan.adhoc_to_stream_prost(state); + } + if let Some(stream_share) = self.as_stream_share() { + return stream_share.adhoc_to_stream_prost(state); + } + + let node = Some(self.try_to_stream_prost_body(state)?); + let input = self + .inputs() + .into_iter() + .map(|plan| plan.to_stream_prost(state)) + .try_collect()?; + // TODO: support pk_indices and operator_id + Ok(StreamPlanPb { + input, + identity: self.explain_myself_to_string(), + node_body: node, + operator_id: self.id().0 as _, + stream_key: self + .stream_key() + .unwrap_or_default() + .iter() + .map(|x| *x as u32) + .collect(), + fields: self.schema().to_prost(), + append_only: self.plan_base().append_only(), + }) }) } @@ -749,20 +761,26 @@ impl dyn PlanNode { /// Serialize the plan node and its children to a batch plan proto without the identity field /// (for testing). pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult { - let node_body = Some(self.try_to_batch_prost_body()?); - let children = self - .inputs() - .into_iter() - .map(|plan| plan.to_batch_prost_identity(identity)) - .try_collect()?; - Ok(BatchPlanPb { - children, - identity: if identity { - self.explain_myself_to_string() - } else { - "".into() - }, - node_body, + recursive::tracker!().recurse(|t| { + if t.depth_reaches(PLAN_DEPTH_THRESHOLD) { + notice_to_user(PLAN_TOO_DEEP_NOTICE); + } + + let node_body = Some(self.try_to_batch_prost_body()?); + let children = self + .inputs() + .into_iter() + .map(|plan| plan.to_batch_prost_identity(identity)) + .try_collect()?; + Ok(BatchPlanPb { + children, + identity: if identity { + self.explain_myself_to_string() + } else { + "".into() + }, + node_body, + }) }) } diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 009449ec9228d..e7548ce5fa176 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -14,6 +14,7 @@ mod graph; use graph::*; +use risingwave_common::util::recursive::{self, Recurse as _}; use risingwave_pb::stream_plan::stream_node::NodeBody; mod rewrite; @@ -253,201 +254,208 @@ fn build_fragment( current_fragment: &mut StreamFragment, mut stream_node: StreamNode, ) -> Result { - // Update current fragment based on the node we're visiting. - match stream_node.get_node_body()? { - NodeBody::BarrierRecv(_) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::BarrierRecv as u32 - } + recursive::tracker!().recurse(|_t| { + // Update current fragment based on the node we're visiting. + match stream_node.get_node_body()? { + NodeBody::BarrierRecv(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::BarrierRecv as u32 + } - NodeBody::Source(node) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::Source as u32; + NodeBody::Source(node) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::Source as u32; - if let Some(source) = node.source_inner.as_ref() - && let Some(source_info) = source.info.as_ref() - && source_info.is_shared() - && !source_info.is_distributed - { - current_fragment.requires_singleton = true; + if let Some(source) = node.source_inner.as_ref() + && let Some(source_info) = source.info.as_ref() + && source_info.is_shared() + && !source_info.is_distributed + { + current_fragment.requires_singleton = true; + } } - } - NodeBody::Dml(_) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::Dml as u32; - } + NodeBody::Dml(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::Dml as u32; + } - NodeBody::Materialize(_) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::Mview as u32; - } + NodeBody::Materialize(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::Mview as u32; + } - NodeBody::Sink(_) => current_fragment.fragment_type_mask |= FragmentTypeFlag::Sink as u32, + NodeBody::Sink(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::Sink as u32 + } - NodeBody::Subscription(_) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::Subscription as u32 - } + NodeBody::Subscription(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::Subscription as u32 + } - NodeBody::TopN(_) => current_fragment.requires_singleton = true, + NodeBody::TopN(_) => current_fragment.requires_singleton = true, - NodeBody::StreamScan(node) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32; - // memorize table id for later use - // The table id could be a upstream CDC source - state - .dependent_table_ids - .insert(TableId::new(node.table_id)); - current_fragment.upstream_table_ids.push(node.table_id); - } + NodeBody::StreamScan(node) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32; + // memorize table id for later use + // The table id could be a upstream CDC source + state + .dependent_table_ids + .insert(TableId::new(node.table_id)); + current_fragment.upstream_table_ids.push(node.table_id); + } - NodeBody::StreamCdcScan(_) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32; - // the backfill algorithm is not parallel safe - current_fragment.requires_singleton = true; - } + NodeBody::StreamCdcScan(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32; + // the backfill algorithm is not parallel safe + current_fragment.requires_singleton = true; + } - NodeBody::CdcFilter(node) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::CdcFilter as u32; - // memorize upstream source id for later use - state - .dependent_table_ids - .insert(TableId::new(node.upstream_source_id)); - current_fragment - .upstream_table_ids - .push(node.upstream_source_id); - } - NodeBody::SourceBackfill(node) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::SourceScan as u32; - // memorize upstream source id for later use - let source_id = node.upstream_source_id; - state.dependent_table_ids.insert(source_id.into()); - current_fragment.upstream_table_ids.push(source_id); - } + NodeBody::CdcFilter(node) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::CdcFilter as u32; + // memorize upstream source id for later use + state + .dependent_table_ids + .insert(TableId::new(node.upstream_source_id)); + current_fragment + .upstream_table_ids + .push(node.upstream_source_id); + } + NodeBody::SourceBackfill(node) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::SourceScan as u32; + // memorize upstream source id for later use + let source_id = node.upstream_source_id; + state.dependent_table_ids.insert(source_id.into()); + current_fragment.upstream_table_ids.push(source_id); + } - NodeBody::Now(_) => { - // TODO: Remove this and insert a `BarrierRecv` instead. - current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32; - current_fragment.requires_singleton = true; - } + NodeBody::Now(_) => { + // TODO: Remove this and insert a `BarrierRecv` instead. + current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32; + current_fragment.requires_singleton = true; + } - NodeBody::Values(_) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::Values as u32; - current_fragment.requires_singleton = true; - } + NodeBody::Values(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::Values as u32; + current_fragment.requires_singleton = true; + } - _ => {} - }; + _ => {} + }; - // handle join logic - if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap() { - if delta_index_join.get_join_type()? == JoinType::Inner - && delta_index_join.condition.is_none() + // handle join logic + if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap() { - return build_delta_join_without_arrange(state, current_fragment, stream_node); - } else { - panic!("only inner join without non-equal condition is supported for delta joins"); + if delta_index_join.get_join_type()? == JoinType::Inner + && delta_index_join.condition.is_none() + { + return build_delta_join_without_arrange(state, current_fragment, stream_node); + } else { + panic!("only inner join without non-equal condition is supported for delta joins"); + } } - } - // Usually we do not expect exchange node to be visited here, which should be handled by the - // following logic of "visit children" instead. If it does happen (for example, `Share` will be - // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op - // node to it, so that the meta service can handle it correctly. - if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() { - stream_node = state.gen_no_op_stream_node(stream_node); - } + // Usually we do not expect exchange node to be visited here, which should be handled by the + // following logic of "visit children" instead. If it does happen (for example, `Share` will be + // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op + // node to it, so that the meta service can handle it correctly. + if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() { + stream_node = state.gen_no_op_stream_node(stream_node); + } - // Visit plan children. - stream_node.input = stream_node - .input - .into_iter() - .map(|mut child_node| { - match child_node.get_node_body()? { - // When exchange node is generated when doing rewrites, it could be having - // zero input. In this case, we won't recursively visit its children. - NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node), - // Exchange node indicates a new child fragment. - NodeBody::Exchange(exchange_node) => { - let exchange_node_strategy = exchange_node.get_strategy()?.clone(); - - // Exchange node should have only one input. - let [input]: [_; 1] = std::mem::take(&mut child_node.input).try_into().unwrap(); - let child_fragment = build_and_add_fragment(state, input)?; - - let result = state.fragment_graph.try_add_edge( - child_fragment.fragment_id, - current_fragment.fragment_id, - StreamFragmentEdge { - dispatch_strategy: exchange_node_strategy.clone(), - // Always use the exchange operator id as the link id. - link_id: child_node.operator_id, - }, - ); - - // It's possible that there're multiple edges between two fragments, while the - // meta service and the compute node does not expect this. In this case, we - // manually insert a fragment of `NoOp` between the two fragments. - if result.is_err() { - // Assign a new operator id for the `Exchange`, so we can distinguish it - // from duplicate edges and break the sharing. - child_node.operator_id = state.gen_operator_id() as u64; - - // Take the upstream plan node as the reference for properties of `NoOp`. - let ref_fragment_node = child_fragment.node.as_ref().unwrap(); - let no_shuffle_strategy = DispatchStrategy { - r#type: DispatcherType::NoShuffle as i32, - dist_key_indices: vec![], - output_indices: (0..ref_fragment_node.fields.len() as u32).collect(), - }; - - let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64; - - let no_op_fragment = { - let node = state.gen_no_op_stream_node(StreamNode { - operator_id: no_shuffle_exchange_operator_id, - identity: "StreamNoShuffleExchange".into(), - node_body: Some(NodeBody::Exchange(ExchangeNode { - strategy: Some(no_shuffle_strategy.clone()), - })), - input: vec![], - - // Take reference's properties. - stream_key: ref_fragment_node.stream_key.clone(), - append_only: ref_fragment_node.append_only, - fields: ref_fragment_node.fields.clone(), - }); - - let mut fragment = state.new_stream_fragment(); - fragment.node = Some(node.into()); - Rc::new(fragment) - }; - - state.fragment_graph.add_fragment(no_op_fragment.clone()); - - state.fragment_graph.add_edge( + // Visit plan children. + stream_node.input = stream_node + .input + .into_iter() + .map(|mut child_node| { + match child_node.get_node_body()? { + // When exchange node is generated when doing rewrites, it could be having + // zero input. In this case, we won't recursively visit its children. + NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node), + // Exchange node indicates a new child fragment. + NodeBody::Exchange(exchange_node) => { + let exchange_node_strategy = exchange_node.get_strategy()?.clone(); + + // Exchange node should have only one input. + let [input]: [_; 1] = + std::mem::take(&mut child_node.input).try_into().unwrap(); + let child_fragment = build_and_add_fragment(state, input)?; + + let result = state.fragment_graph.try_add_edge( child_fragment.fragment_id, - no_op_fragment.fragment_id, - StreamFragmentEdge { - // Use `NoShuffle` exhcnage strategy for upstream edge. - dispatch_strategy: no_shuffle_strategy, - link_id: no_shuffle_exchange_operator_id, - }, - ); - state.fragment_graph.add_edge( - no_op_fragment.fragment_id, current_fragment.fragment_id, StreamFragmentEdge { - // Use the original exchange strategy for downstream edge. - dispatch_strategy: exchange_node_strategy, + dispatch_strategy: exchange_node_strategy.clone(), + // Always use the exchange operator id as the link id. link_id: child_node.operator_id, }, ); + + // It's possible that there're multiple edges between two fragments, while the + // meta service and the compute node does not expect this. In this case, we + // manually insert a fragment of `NoOp` between the two fragments. + if result.is_err() { + // Assign a new operator id for the `Exchange`, so we can distinguish it + // from duplicate edges and break the sharing. + child_node.operator_id = state.gen_operator_id() as u64; + + // Take the upstream plan node as the reference for properties of `NoOp`. + let ref_fragment_node = child_fragment.node.as_ref().unwrap(); + let no_shuffle_strategy = DispatchStrategy { + r#type: DispatcherType::NoShuffle as i32, + dist_key_indices: vec![], + output_indices: (0..ref_fragment_node.fields.len() as u32) + .collect(), + }; + + let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64; + + let no_op_fragment = { + let node = state.gen_no_op_stream_node(StreamNode { + operator_id: no_shuffle_exchange_operator_id, + identity: "StreamNoShuffleExchange".into(), + node_body: Some(NodeBody::Exchange(ExchangeNode { + strategy: Some(no_shuffle_strategy.clone()), + })), + input: vec![], + + // Take reference's properties. + stream_key: ref_fragment_node.stream_key.clone(), + append_only: ref_fragment_node.append_only, + fields: ref_fragment_node.fields.clone(), + }); + + let mut fragment = state.new_stream_fragment(); + fragment.node = Some(node.into()); + Rc::new(fragment) + }; + + state.fragment_graph.add_fragment(no_op_fragment.clone()); + + state.fragment_graph.add_edge( + child_fragment.fragment_id, + no_op_fragment.fragment_id, + StreamFragmentEdge { + // Use `NoShuffle` exhcnage strategy for upstream edge. + dispatch_strategy: no_shuffle_strategy, + link_id: no_shuffle_exchange_operator_id, + }, + ); + state.fragment_graph.add_edge( + no_op_fragment.fragment_id, + current_fragment.fragment_id, + StreamFragmentEdge { + // Use the original exchange strategy for downstream edge. + dispatch_strategy: exchange_node_strategy, + link_id: child_node.operator_id, + }, + ); + } + + Ok(child_node) } - Ok(child_node) + // For other children, visit recursively. + _ => build_fragment(state, current_fragment, child_node), } - - // For other children, visit recursively. - _ => build_fragment(state, current_fragment, child_node), - } - }) - .collect::>()?; - Ok(stream_node) + }) + .collect::>()?; + Ok(stream_node) + }) }