diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 98752877dfb80..2b786c6ecc4cf 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -696,9 +696,9 @@ impl dyn PlanNode { } } -const PLAN_DEPTH_THRESHOLD: usize = 256; +const PLAN_DEPTH_THRESHOLD: usize = 30; const PLAN_TOO_DEEP_NOTICE: &str = "The plan is too deep. \ -Consider rewriting the query to simplify it if you encounter any issues."; +Consider simplifying or splitting the query if you encounter any issues."; impl dyn PlanNode { /// Serialize the plan node and its children to a stream plan proto. @@ -710,7 +710,7 @@ impl dyn PlanNode { state: &mut BuildFragmentGraphState, ) -> SchedulerResult { recursive::tracker!().recurse(|t| { - if t.depth() > PLAN_DEPTH_THRESHOLD { + if t.depth() == PLAN_DEPTH_THRESHOLD { notice_to_user(PLAN_TOO_DEEP_NOTICE); } @@ -762,7 +762,7 @@ impl dyn PlanNode { /// (for testing). pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult { recursive::tracker!().recurse(|t| { - if t.depth() > PLAN_DEPTH_THRESHOLD { + if t.depth() == PLAN_DEPTH_THRESHOLD { notice_to_user(PLAN_TOO_DEEP_NOTICE); } diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 009449ec9228d..e7548ce5fa176 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -14,6 +14,7 @@ mod graph; use graph::*; +use risingwave_common::util::recursive::{self, Recurse as _}; use risingwave_pb::stream_plan::stream_node::NodeBody; mod rewrite; @@ -253,201 +254,208 @@ fn build_fragment( current_fragment: &mut StreamFragment, mut stream_node: StreamNode, ) -> Result { - // Update current fragment based on the node we're visiting. - match stream_node.get_node_body()? { - NodeBody::BarrierRecv(_) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::BarrierRecv as u32 - } + recursive::tracker!().recurse(|_t| { + // Update current fragment based on the node we're visiting. + match stream_node.get_node_body()? { + NodeBody::BarrierRecv(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::BarrierRecv as u32 + } - NodeBody::Source(node) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::Source as u32; + NodeBody::Source(node) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::Source as u32; - if let Some(source) = node.source_inner.as_ref() - && let Some(source_info) = source.info.as_ref() - && source_info.is_shared() - && !source_info.is_distributed - { - current_fragment.requires_singleton = true; + if let Some(source) = node.source_inner.as_ref() + && let Some(source_info) = source.info.as_ref() + && source_info.is_shared() + && !source_info.is_distributed + { + current_fragment.requires_singleton = true; + } } - } - NodeBody::Dml(_) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::Dml as u32; - } + NodeBody::Dml(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::Dml as u32; + } - NodeBody::Materialize(_) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::Mview as u32; - } + NodeBody::Materialize(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::Mview as u32; + } - NodeBody::Sink(_) => current_fragment.fragment_type_mask |= FragmentTypeFlag::Sink as u32, + NodeBody::Sink(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::Sink as u32 + } - NodeBody::Subscription(_) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::Subscription as u32 - } + NodeBody::Subscription(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::Subscription as u32 + } - NodeBody::TopN(_) => current_fragment.requires_singleton = true, + NodeBody::TopN(_) => current_fragment.requires_singleton = true, - NodeBody::StreamScan(node) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32; - // memorize table id for later use - // The table id could be a upstream CDC source - state - .dependent_table_ids - .insert(TableId::new(node.table_id)); - current_fragment.upstream_table_ids.push(node.table_id); - } + NodeBody::StreamScan(node) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32; + // memorize table id for later use + // The table id could be a upstream CDC source + state + .dependent_table_ids + .insert(TableId::new(node.table_id)); + current_fragment.upstream_table_ids.push(node.table_id); + } - NodeBody::StreamCdcScan(_) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32; - // the backfill algorithm is not parallel safe - current_fragment.requires_singleton = true; - } + NodeBody::StreamCdcScan(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32; + // the backfill algorithm is not parallel safe + current_fragment.requires_singleton = true; + } - NodeBody::CdcFilter(node) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::CdcFilter as u32; - // memorize upstream source id for later use - state - .dependent_table_ids - .insert(TableId::new(node.upstream_source_id)); - current_fragment - .upstream_table_ids - .push(node.upstream_source_id); - } - NodeBody::SourceBackfill(node) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::SourceScan as u32; - // memorize upstream source id for later use - let source_id = node.upstream_source_id; - state.dependent_table_ids.insert(source_id.into()); - current_fragment.upstream_table_ids.push(source_id); - } + NodeBody::CdcFilter(node) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::CdcFilter as u32; + // memorize upstream source id for later use + state + .dependent_table_ids + .insert(TableId::new(node.upstream_source_id)); + current_fragment + .upstream_table_ids + .push(node.upstream_source_id); + } + NodeBody::SourceBackfill(node) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::SourceScan as u32; + // memorize upstream source id for later use + let source_id = node.upstream_source_id; + state.dependent_table_ids.insert(source_id.into()); + current_fragment.upstream_table_ids.push(source_id); + } - NodeBody::Now(_) => { - // TODO: Remove this and insert a `BarrierRecv` instead. - current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32; - current_fragment.requires_singleton = true; - } + NodeBody::Now(_) => { + // TODO: Remove this and insert a `BarrierRecv` instead. + current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32; + current_fragment.requires_singleton = true; + } - NodeBody::Values(_) => { - current_fragment.fragment_type_mask |= FragmentTypeFlag::Values as u32; - current_fragment.requires_singleton = true; - } + NodeBody::Values(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::Values as u32; + current_fragment.requires_singleton = true; + } - _ => {} - }; + _ => {} + }; - // handle join logic - if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap() { - if delta_index_join.get_join_type()? == JoinType::Inner - && delta_index_join.condition.is_none() + // handle join logic + if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap() { - return build_delta_join_without_arrange(state, current_fragment, stream_node); - } else { - panic!("only inner join without non-equal condition is supported for delta joins"); + if delta_index_join.get_join_type()? == JoinType::Inner + && delta_index_join.condition.is_none() + { + return build_delta_join_without_arrange(state, current_fragment, stream_node); + } else { + panic!("only inner join without non-equal condition is supported for delta joins"); + } } - } - // Usually we do not expect exchange node to be visited here, which should be handled by the - // following logic of "visit children" instead. If it does happen (for example, `Share` will be - // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op - // node to it, so that the meta service can handle it correctly. - if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() { - stream_node = state.gen_no_op_stream_node(stream_node); - } + // Usually we do not expect exchange node to be visited here, which should be handled by the + // following logic of "visit children" instead. If it does happen (for example, `Share` will be + // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op + // node to it, so that the meta service can handle it correctly. + if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() { + stream_node = state.gen_no_op_stream_node(stream_node); + } - // Visit plan children. - stream_node.input = stream_node - .input - .into_iter() - .map(|mut child_node| { - match child_node.get_node_body()? { - // When exchange node is generated when doing rewrites, it could be having - // zero input. In this case, we won't recursively visit its children. - NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node), - // Exchange node indicates a new child fragment. - NodeBody::Exchange(exchange_node) => { - let exchange_node_strategy = exchange_node.get_strategy()?.clone(); - - // Exchange node should have only one input. - let [input]: [_; 1] = std::mem::take(&mut child_node.input).try_into().unwrap(); - let child_fragment = build_and_add_fragment(state, input)?; - - let result = state.fragment_graph.try_add_edge( - child_fragment.fragment_id, - current_fragment.fragment_id, - StreamFragmentEdge { - dispatch_strategy: exchange_node_strategy.clone(), - // Always use the exchange operator id as the link id. - link_id: child_node.operator_id, - }, - ); - - // It's possible that there're multiple edges between two fragments, while the - // meta service and the compute node does not expect this. In this case, we - // manually insert a fragment of `NoOp` between the two fragments. - if result.is_err() { - // Assign a new operator id for the `Exchange`, so we can distinguish it - // from duplicate edges and break the sharing. - child_node.operator_id = state.gen_operator_id() as u64; - - // Take the upstream plan node as the reference for properties of `NoOp`. - let ref_fragment_node = child_fragment.node.as_ref().unwrap(); - let no_shuffle_strategy = DispatchStrategy { - r#type: DispatcherType::NoShuffle as i32, - dist_key_indices: vec![], - output_indices: (0..ref_fragment_node.fields.len() as u32).collect(), - }; - - let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64; - - let no_op_fragment = { - let node = state.gen_no_op_stream_node(StreamNode { - operator_id: no_shuffle_exchange_operator_id, - identity: "StreamNoShuffleExchange".into(), - node_body: Some(NodeBody::Exchange(ExchangeNode { - strategy: Some(no_shuffle_strategy.clone()), - })), - input: vec![], - - // Take reference's properties. - stream_key: ref_fragment_node.stream_key.clone(), - append_only: ref_fragment_node.append_only, - fields: ref_fragment_node.fields.clone(), - }); - - let mut fragment = state.new_stream_fragment(); - fragment.node = Some(node.into()); - Rc::new(fragment) - }; - - state.fragment_graph.add_fragment(no_op_fragment.clone()); - - state.fragment_graph.add_edge( + // Visit plan children. + stream_node.input = stream_node + .input + .into_iter() + .map(|mut child_node| { + match child_node.get_node_body()? { + // When exchange node is generated when doing rewrites, it could be having + // zero input. In this case, we won't recursively visit its children. + NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node), + // Exchange node indicates a new child fragment. + NodeBody::Exchange(exchange_node) => { + let exchange_node_strategy = exchange_node.get_strategy()?.clone(); + + // Exchange node should have only one input. + let [input]: [_; 1] = + std::mem::take(&mut child_node.input).try_into().unwrap(); + let child_fragment = build_and_add_fragment(state, input)?; + + let result = state.fragment_graph.try_add_edge( child_fragment.fragment_id, - no_op_fragment.fragment_id, - StreamFragmentEdge { - // Use `NoShuffle` exhcnage strategy for upstream edge. - dispatch_strategy: no_shuffle_strategy, - link_id: no_shuffle_exchange_operator_id, - }, - ); - state.fragment_graph.add_edge( - no_op_fragment.fragment_id, current_fragment.fragment_id, StreamFragmentEdge { - // Use the original exchange strategy for downstream edge. - dispatch_strategy: exchange_node_strategy, + dispatch_strategy: exchange_node_strategy.clone(), + // Always use the exchange operator id as the link id. link_id: child_node.operator_id, }, ); + + // It's possible that there're multiple edges between two fragments, while the + // meta service and the compute node does not expect this. In this case, we + // manually insert a fragment of `NoOp` between the two fragments. + if result.is_err() { + // Assign a new operator id for the `Exchange`, so we can distinguish it + // from duplicate edges and break the sharing. + child_node.operator_id = state.gen_operator_id() as u64; + + // Take the upstream plan node as the reference for properties of `NoOp`. + let ref_fragment_node = child_fragment.node.as_ref().unwrap(); + let no_shuffle_strategy = DispatchStrategy { + r#type: DispatcherType::NoShuffle as i32, + dist_key_indices: vec![], + output_indices: (0..ref_fragment_node.fields.len() as u32) + .collect(), + }; + + let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64; + + let no_op_fragment = { + let node = state.gen_no_op_stream_node(StreamNode { + operator_id: no_shuffle_exchange_operator_id, + identity: "StreamNoShuffleExchange".into(), + node_body: Some(NodeBody::Exchange(ExchangeNode { + strategy: Some(no_shuffle_strategy.clone()), + })), + input: vec![], + + // Take reference's properties. + stream_key: ref_fragment_node.stream_key.clone(), + append_only: ref_fragment_node.append_only, + fields: ref_fragment_node.fields.clone(), + }); + + let mut fragment = state.new_stream_fragment(); + fragment.node = Some(node.into()); + Rc::new(fragment) + }; + + state.fragment_graph.add_fragment(no_op_fragment.clone()); + + state.fragment_graph.add_edge( + child_fragment.fragment_id, + no_op_fragment.fragment_id, + StreamFragmentEdge { + // Use `NoShuffle` exhcnage strategy for upstream edge. + dispatch_strategy: no_shuffle_strategy, + link_id: no_shuffle_exchange_operator_id, + }, + ); + state.fragment_graph.add_edge( + no_op_fragment.fragment_id, + current_fragment.fragment_id, + StreamFragmentEdge { + // Use the original exchange strategy for downstream edge. + dispatch_strategy: exchange_node_strategy, + link_id: child_node.operator_id, + }, + ); + } + + Ok(child_node) } - Ok(child_node) + // For other children, visit recursively. + _ => build_fragment(state, current_fragment, child_node), } - - // For other children, visit recursively. - _ => build_fragment(state, current_fragment, child_node), - } - }) - .collect::>()?; - Ok(stream_node) + }) + .collect::>()?; + Ok(stream_node) + }) }