From 7d5fea1e857409c89ad94617792794da021d9b4f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 12 Apr 2024 15:28:55 +0800 Subject: [PATCH] wrap to_prost Signed-off-by: Bugen Zhao --- src/frontend/src/optimizer/plan_node/mod.rs | 112 ++++++++++++-------- 1 file changed, 65 insertions(+), 47 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 780e1d2b39cdd..98752877dfb80 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -39,6 +39,7 @@ use itertools::Itertools; use paste::paste; use pretty_xmlish::{Pretty, PrettyConfig}; use risingwave_common::catalog::Schema; +use risingwave_common::util::recursive::{self, Recurse}; use risingwave_pb::batch_plan::PlanNode as BatchPlanPb; use risingwave_pb::stream_plan::StreamNode as StreamPlanPb; use serde::Serialize; @@ -51,6 +52,7 @@ use self::utils::Distill; use super::property::{Distribution, FunctionalDependencySet, Order}; use crate::error::{ErrorCode, Result}; use crate::optimizer::ExpressionSimplifyRewriter; +use crate::session::current::notice_to_user; /// A marker trait for different conventions, used for enforcing type safety. /// @@ -694,6 +696,10 @@ impl dyn PlanNode { } } +const PLAN_DEPTH_THRESHOLD: usize = 256; +const PLAN_TOO_DEEP_NOTICE: &str = "The plan is too deep. \ +Consider rewriting the query to simplify it if you encounter any issues."; + impl dyn PlanNode { /// Serialize the plan node and its children to a stream plan proto. /// @@ -703,41 +709,47 @@ impl dyn PlanNode { &self, state: &mut BuildFragmentGraphState, ) -> SchedulerResult { - use stream::prelude::*; + recursive::tracker!().recurse(|t| { + if t.depth() > PLAN_DEPTH_THRESHOLD { + notice_to_user(PLAN_TOO_DEEP_NOTICE); + } - if let Some(stream_table_scan) = self.as_stream_table_scan() { - return stream_table_scan.adhoc_to_stream_prost(state); - } - if let Some(stream_cdc_table_scan) = self.as_stream_cdc_table_scan() { - return stream_cdc_table_scan.adhoc_to_stream_prost(state); - } - if let Some(stream_source_scan) = self.as_stream_source_scan() { - return stream_source_scan.adhoc_to_stream_prost(state); - } - if let Some(stream_share) = self.as_stream_share() { - return stream_share.adhoc_to_stream_prost(state); - } + use stream::prelude::*; - let node = Some(self.try_to_stream_prost_body(state)?); - let input = self - .inputs() - .into_iter() - .map(|plan| plan.to_stream_prost(state)) - .try_collect()?; - // TODO: support pk_indices and operator_id - Ok(StreamPlanPb { - input, - identity: self.explain_myself_to_string(), - node_body: node, - operator_id: self.id().0 as _, - stream_key: self - .stream_key() - .unwrap_or_default() - .iter() - .map(|x| *x as u32) - .collect(), - fields: self.schema().to_prost(), - append_only: self.plan_base().append_only(), + if let Some(stream_table_scan) = self.as_stream_table_scan() { + return stream_table_scan.adhoc_to_stream_prost(state); + } + if let Some(stream_cdc_table_scan) = self.as_stream_cdc_table_scan() { + return stream_cdc_table_scan.adhoc_to_stream_prost(state); + } + if let Some(stream_source_scan) = self.as_stream_source_scan() { + return stream_source_scan.adhoc_to_stream_prost(state); + } + if let Some(stream_share) = self.as_stream_share() { + return stream_share.adhoc_to_stream_prost(state); + } + + let node = Some(self.try_to_stream_prost_body(state)?); + let input = self + .inputs() + .into_iter() + .map(|plan| plan.to_stream_prost(state)) + .try_collect()?; + // TODO: support pk_indices and operator_id + Ok(StreamPlanPb { + input, + identity: self.explain_myself_to_string(), + node_body: node, + operator_id: self.id().0 as _, + stream_key: self + .stream_key() + .unwrap_or_default() + .iter() + .map(|x| *x as u32) + .collect(), + fields: self.schema().to_prost(), + append_only: self.plan_base().append_only(), + }) }) } @@ -749,20 +761,26 @@ impl dyn PlanNode { /// Serialize the plan node and its children to a batch plan proto without the identity field /// (for testing). pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult { - let node_body = Some(self.try_to_batch_prost_body()?); - let children = self - .inputs() - .into_iter() - .map(|plan| plan.to_batch_prost_identity(identity)) - .try_collect()?; - Ok(BatchPlanPb { - children, - identity: if identity { - self.explain_myself_to_string() - } else { - "".into() - }, - node_body, + recursive::tracker!().recurse(|t| { + if t.depth() > PLAN_DEPTH_THRESHOLD { + notice_to_user(PLAN_TOO_DEEP_NOTICE); + } + + let node_body = Some(self.try_to_batch_prost_body()?); + let children = self + .inputs() + .into_iter() + .map(|plan| plan.to_batch_prost_identity(identity)) + .try_collect()?; + Ok(BatchPlanPb { + children, + identity: if identity { + self.explain_myself_to_string() + } else { + "".into() + }, + node_body, + }) }) }