Skip to content

Commit

Permalink
wrap to_prost
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Apr 12, 2024
1 parent 93f6ef9 commit 7d5fea1
Showing 1 changed file with 65 additions and 47 deletions.
112 changes: 65 additions & 47 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use itertools::Itertools;
use paste::paste;
use pretty_xmlish::{Pretty, PrettyConfig};
use risingwave_common::catalog::Schema;
use risingwave_common::util::recursive::{self, Recurse};
use risingwave_pb::batch_plan::PlanNode as BatchPlanPb;
use risingwave_pb::stream_plan::StreamNode as StreamPlanPb;
use serde::Serialize;
Expand All @@ -51,6 +52,7 @@ use self::utils::Distill;
use super::property::{Distribution, FunctionalDependencySet, Order};
use crate::error::{ErrorCode, Result};
use crate::optimizer::ExpressionSimplifyRewriter;
use crate::session::current::notice_to_user;

/// A marker trait for different conventions, used for enforcing type safety.
///
Expand Down Expand Up @@ -694,6 +696,10 @@ impl dyn PlanNode {
}
}

const PLAN_DEPTH_THRESHOLD: usize = 256;
const PLAN_TOO_DEEP_NOTICE: &str = "The plan is too deep. \
Consider rewriting the query to simplify it if you encounter any issues.";

impl dyn PlanNode {
/// Serialize the plan node and its children to a stream plan proto.
///
Expand All @@ -703,41 +709,47 @@ impl dyn PlanNode {
&self,
state: &mut BuildFragmentGraphState,
) -> SchedulerResult<StreamPlanPb> {
use stream::prelude::*;
recursive::tracker!().recurse(|t| {
if t.depth() > PLAN_DEPTH_THRESHOLD {
notice_to_user(PLAN_TOO_DEEP_NOTICE);
}

if let Some(stream_table_scan) = self.as_stream_table_scan() {
return stream_table_scan.adhoc_to_stream_prost(state);
}
if let Some(stream_cdc_table_scan) = self.as_stream_cdc_table_scan() {
return stream_cdc_table_scan.adhoc_to_stream_prost(state);
}
if let Some(stream_source_scan) = self.as_stream_source_scan() {
return stream_source_scan.adhoc_to_stream_prost(state);
}
if let Some(stream_share) = self.as_stream_share() {
return stream_share.adhoc_to_stream_prost(state);
}
use stream::prelude::*;

let node = Some(self.try_to_stream_prost_body(state)?);
let input = self
.inputs()
.into_iter()
.map(|plan| plan.to_stream_prost(state))
.try_collect()?;
// TODO: support pk_indices and operator_id
Ok(StreamPlanPb {
input,
identity: self.explain_myself_to_string(),
node_body: node,
operator_id: self.id().0 as _,
stream_key: self
.stream_key()
.unwrap_or_default()
.iter()
.map(|x| *x as u32)
.collect(),
fields: self.schema().to_prost(),
append_only: self.plan_base().append_only(),
if let Some(stream_table_scan) = self.as_stream_table_scan() {
return stream_table_scan.adhoc_to_stream_prost(state);
}
if let Some(stream_cdc_table_scan) = self.as_stream_cdc_table_scan() {
return stream_cdc_table_scan.adhoc_to_stream_prost(state);
}
if let Some(stream_source_scan) = self.as_stream_source_scan() {
return stream_source_scan.adhoc_to_stream_prost(state);
}
if let Some(stream_share) = self.as_stream_share() {
return stream_share.adhoc_to_stream_prost(state);
}

let node = Some(self.try_to_stream_prost_body(state)?);
let input = self
.inputs()
.into_iter()
.map(|plan| plan.to_stream_prost(state))
.try_collect()?;
// TODO: support pk_indices and operator_id
Ok(StreamPlanPb {
input,
identity: self.explain_myself_to_string(),
node_body: node,
operator_id: self.id().0 as _,
stream_key: self
.stream_key()
.unwrap_or_default()
.iter()
.map(|x| *x as u32)
.collect(),
fields: self.schema().to_prost(),
append_only: self.plan_base().append_only(),
})
})
}

Expand All @@ -749,20 +761,26 @@ impl dyn PlanNode {
/// Serialize the plan node and its children to a batch plan proto without the identity field
/// (for testing).
pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult<BatchPlanPb> {
let node_body = Some(self.try_to_batch_prost_body()?);
let children = self
.inputs()
.into_iter()
.map(|plan| plan.to_batch_prost_identity(identity))
.try_collect()?;
Ok(BatchPlanPb {
children,
identity: if identity {
self.explain_myself_to_string()
} else {
"".into()
},
node_body,
recursive::tracker!().recurse(|t| {
if t.depth() > PLAN_DEPTH_THRESHOLD {
notice_to_user(PLAN_TOO_DEEP_NOTICE);
}

let node_body = Some(self.try_to_batch_prost_body()?);
let children = self
.inputs()
.into_iter()
.map(|plan| plan.to_batch_prost_identity(identity))
.try_collect()?;
Ok(BatchPlanPb {
children,
identity: if identity {
self.explain_myself_to_string()
} else {
"".into()
},
node_body,
})
})
}

Expand Down

0 comments on commit 7d5fea1

Please sign in to comment.