diff --git a/src/common/src/catalog/physical_table.rs b/src/common/src/catalog/physical_table.rs index 1386cc7db1cec..48ead96874e75 100644 --- a/src/common/src/catalog/physical_table.rs +++ b/src/common/src/catalog/physical_table.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use anyhow::anyhow; use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_pb::common::PbColumnOrder; @@ -79,7 +80,7 @@ impl TableDesc { .collect() } - pub fn to_protobuf(&self) -> StorageTableDesc { + pub fn try_to_protobuf(&self) -> anyhow::Result { let dist_key_indices: Vec = self.distribution_key.iter().map(|&k| k as u32).collect(); let pk_indices: Vec = self .pk @@ -102,19 +103,20 @@ impl TableDesc { pk_indices .iter() .position(|&pi| di == pi) - .unwrap_or_else(|| { - panic!( + .ok_or_else(|| { + anyhow!( "distribution key {:?} must be a subset of primary key {:?}", - dist_key_indices, pk_indices + dist_key_indices, + pk_indices ) }) + .map(|d| d as u32) }) - .map(|d| d as u32) - .collect_vec() + .try_collect()? } else { Vec::new() }; - StorageTableDesc { + Ok(StorageTableDesc { table_id: self.table_id.into(), columns: self.columns.iter().map(Into::into).collect(), pk: self.pk.iter().map(|v| v.to_protobuf()).collect(), @@ -125,7 +127,7 @@ impl TableDesc { versioned: self.versioned, stream_key: self.stream_key.iter().map(|&x| x as u32).collect(), vnode_col_idx_in_pk, - } + }) } /// Helper function to create a mapping from `column id` to `column index` diff --git a/src/ctl/src/cmd_impl/table/scan.rs b/src/ctl/src/cmd_impl/table/scan.rs index af268e7d33193..8c21d975009fe 100644 --- a/src/ctl/src/cmd_impl/table/scan.rs +++ b/src/ctl/src/cmd_impl/table/scan.rs @@ -69,18 +69,21 @@ pub async fn make_state_table(hummock: S, table: &TableCatalog) - .await } -pub fn make_storage_table(hummock: S, table: &TableCatalog) -> StorageTable { +pub fn make_storage_table( + hummock: S, + table: &TableCatalog, +) -> Result> { let output_columns_ids = table .columns() .iter() .map(|x| x.column_desc.column_id) .collect(); - StorageTable::new_partial( + Ok(StorageTable::new_partial( hummock, output_columns_ids, Some(TableDistribution::all_vnodes()), - &table.table_desc().to_protobuf(), - ) + &table.table_desc().try_to_protobuf()?, + )) } pub async fn scan(context: &CtlContext, mv_name: String, data_dir: Option) -> Result<()> { @@ -106,7 +109,7 @@ async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore { - let graph = build_graph(plan.clone()); + let graph = build_graph(plan.clone())?; blocks.push(explain_stream_graph(&graph, explain_verbose)); } } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 8cf2eb0e331e1..f78591a4d8c40 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -33,6 +33,7 @@ #![feature(result_flattening)] #![feature(error_generic_member_access)] #![feature(round_ties_even)] +#![feature(iterator_try_collect)] #![recursion_limit = "256"] #[cfg(test)] diff --git a/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs b/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs index 52113880ac528..1567e1ffd0959 100644 --- a/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs +++ b/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs @@ -26,11 +26,12 @@ use crate::expr::{Expr, ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{ - EqJoinPredicate, EqJoinPredicateDisplay, LogicalScan, PlanBase, PlanTreeNodeUnary, ToBatchPb, - ToDistributedBatch, ToLocalBatch, + EqJoinPredicate, EqJoinPredicateDisplay, LogicalScan, PlanBase, PlanTreeNodeUnary, + ToDistributedBatch, ToLocalBatch, TryToBatchPb, }; use crate::optimizer::property::{Distribution, Order, RequiredDist}; use crate::optimizer::PlanRef; +use crate::scheduler::SchedulerResult; use crate::utils::ColIndexMappingRewriteExt; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -200,9 +201,9 @@ impl ToDistributedBatch for BatchLookupJoin { } } -impl ToBatchPb for BatchLookupJoin { - fn to_batch_prost_body(&self) -> NodeBody { - if self.distributed_lookup { +impl TryToBatchPb for BatchLookupJoin { + fn try_to_batch_prost_body(&self) -> SchedulerResult { + Ok(if self.distributed_lookup { NodeBody::DistributedLookupJoin(DistributedLookupJoinNode { join_type: self.core.join_type as i32, condition: self @@ -222,7 +223,7 @@ impl ToBatchPb for BatchLookupJoin { .into_iter() .map(|a| a as _) .collect(), - inner_side_table_desc: Some(self.right_table_desc.to_protobuf()), + inner_side_table_desc: Some(self.right_table_desc.try_to_protobuf()?), inner_side_column_ids: self .right_output_column_ids .iter() @@ -252,7 +253,7 @@ impl ToBatchPb for BatchLookupJoin { .into_iter() .map(|a| a as _) .collect(), - inner_side_table_desc: Some(self.right_table_desc.to_protobuf()), + inner_side_table_desc: Some(self.right_table_desc.try_to_protobuf()?), inner_side_vnode_mapping: vec![], // To be filled in at local.rs inner_side_column_ids: self .right_output_column_ids @@ -264,7 +265,7 @@ impl ToBatchPb for BatchLookupJoin { null_safe: self.eq_join_predicate.null_safes(), lookup_prefix_len: self.lookup_prefix_len as u32, }) - } + }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs index 5aaeb1d13b60c..245a2465631f0 100644 --- a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs @@ -24,12 +24,13 @@ use risingwave_pb::batch_plan::RowSeqScanNode; use super::batch::prelude::*; use super::utils::{childless_record, Distill}; -use super::{generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch}; +use super::{generic, ExprRewritable, PlanBase, PlanRef, ToDistributedBatch}; use crate::catalog::ColumnId; use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::plan_node::ToLocalBatch; +use crate::optimizer::plan_node::{ToLocalBatch, TryToBatchPb}; use crate::optimizer::property::{Distribution, DistributionDisplay, Order}; +use crate::scheduler::SchedulerResult; /// `BatchSeqScan` implements [`super::LogicalScan`] to scan from a row-oriented table #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -232,10 +233,10 @@ impl ToDistributedBatch for BatchSeqScan { } } -impl ToBatchPb for BatchSeqScan { - fn to_batch_prost_body(&self) -> NodeBody { - NodeBody::RowSeqScan(RowSeqScanNode { - table_desc: Some(self.core.table_desc.to_protobuf()), +impl TryToBatchPb for BatchSeqScan { + fn try_to_batch_prost_body(&self) -> SchedulerResult { + Ok(NodeBody::RowSeqScan(RowSeqScanNode { + table_desc: Some(self.core.table_desc.try_to_protobuf()?), column_ids: self .core .output_column_ids() @@ -247,7 +248,7 @@ impl ToBatchPb for BatchSeqScan { vnode_bitmap: None, ordered: !self.order().is_any(), limit: *self.limit(), - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index fd05a4c7ecc9a..a0475c4ae092e 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -678,7 +678,10 @@ impl dyn PlanNode { /// /// Note that [`StreamTableScan`] has its own implementation of `to_stream_prost`. We have a /// hook inside to do some ad-hoc thing for [`StreamTableScan`]. - pub fn to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> StreamPlanPb { + pub fn to_stream_prost( + &self, + state: &mut BuildFragmentGraphState, + ) -> SchedulerResult { use stream::prelude::*; if let Some(stream_table_scan) = self.as_stream_table_scan() { @@ -691,14 +694,14 @@ impl dyn PlanNode { return stream_share.adhoc_to_stream_prost(state); } - let node = Some(self.to_stream_prost_body(state)); + let node = Some(self.try_to_stream_prost_body(state)?); let input = self .inputs() .into_iter() .map(|plan| plan.to_stream_prost(state)) - .collect(); + .try_collect()?; // TODO: support pk_indices and operator_id - StreamPlanPb { + Ok(StreamPlanPb { input, identity: self.explain_myself_to_string(), node_body: node, @@ -711,24 +714,24 @@ impl dyn PlanNode { .collect(), fields: self.schema().to_prost(), append_only: self.plan_base().append_only(), - } + }) } /// Serialize the plan node and its children to a batch plan proto. - pub fn to_batch_prost(&self) -> BatchPlanPb { + pub fn to_batch_prost(&self) -> SchedulerResult { self.to_batch_prost_identity(true) } /// Serialize the plan node and its children to a batch plan proto without the identity field /// (for testing). - pub fn to_batch_prost_identity(&self, identity: bool) -> BatchPlanPb { - let node_body = Some(self.to_batch_prost_body()); + pub fn to_batch_prost_identity(&self, identity: bool) -> SchedulerResult { + let node_body = Some(self.try_to_batch_prost_body()?); let children = self .inputs() .into_iter() .map(|plan| plan.to_batch_prost_identity(identity)) - .collect(); - BatchPlanPb { + .try_collect()?; + Ok(BatchPlanPb { children, identity: if identity { self.explain_myself_to_string() @@ -736,7 +739,7 @@ impl dyn PlanNode { "".into() }, node_body, - } + }) } pub fn explain_myself_to_string(&self) -> String { @@ -956,6 +959,7 @@ use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_rewriter::PlanCloner; use crate::optimizer::plan_visitor::ExprCorrelatedIdFinder; +use crate::scheduler::SchedulerResult; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::{ColIndexMapping, Condition, DynEq, DynHash, Endo, Layer, Visit}; diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index 24bc2dd5f0b60..49b39ef627e73 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -29,6 +29,7 @@ use crate::handler::create_source::debezium_cdc_source_schema; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; use crate::optimizer::property::{Distribution, DistributionDisplay}; +use crate::scheduler::SchedulerResult; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::{Explain, TableCatalog}; @@ -131,7 +132,10 @@ impl StreamNode for StreamCdcTableScan { } impl StreamCdcTableScan { - pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> PbStreamNode { + pub fn adhoc_to_stream_prost( + &self, + state: &mut BuildFragmentGraphState, + ) -> SchedulerResult { use risingwave_pb::stream_plan::*; let stream_key = self @@ -254,7 +258,7 @@ impl StreamCdcTableScan { }); // plan: merge -> filter -> exchange(simple) -> stream_scan - PbStreamNode { + Ok(PbStreamNode { fields: self.schema().to_prost(), input: vec![exchange_stream_node], node_body: Some(stream_scan_body), @@ -262,7 +266,7 @@ impl StreamCdcTableScan { operator_id: self.base.id().0 as u64, identity: self.distill_to_string(), append_only: self.append_only(), - } + }) } pub fn build_cdc_filter_expr(cdc_table_name: &str) -> ExprImpl { diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index 25b45ac24c73a..89b1402cba919 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -23,12 +23,13 @@ use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode}; use super::generic::{self, GenericPlanRef}; use super::stream::prelude::*; use super::utils::{childless_record, Distill}; -use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode}; +use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary}; use crate::expr::{Expr, ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::IndicesDisplay; -use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay}; +use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay, TryToStreamPb}; use crate::optimizer::property::Distribution; +use crate::scheduler::SchedulerResult; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMappingRewriteExt; @@ -133,8 +134,11 @@ impl PlanTreeNodeBinary for StreamDeltaJoin { impl_plan_tree_node_for_binary! { StreamDeltaJoin } -impl StreamNode for StreamDeltaJoin { - fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody { +impl TryToStreamPb for StreamDeltaJoin { + fn try_to_stream_prost_body( + &self, + _state: &mut BuildFragmentGraphState, + ) -> SchedulerResult { let left = self.left(); let right = self.right(); @@ -154,7 +158,7 @@ impl StreamNode for StreamDeltaJoin { // TODO: add a separate delta join node in proto, or move fragmenter to frontend so that we // don't need an intermediate representation. let eq_join_predicate = &self.eq_join_predicate; - NodeBody::DeltaIndexJoin(DeltaIndexJoinNode { + Ok(NodeBody::DeltaIndexJoin(DeltaIndexJoinNode { join_type: self.core.join_type as i32, left_key: eq_join_predicate .left_eq_indexes() @@ -181,7 +185,7 @@ impl StreamNode for StreamDeltaJoin { .iter() .map(ColumnDesc::to_protobuf) .collect(), - table_desc: Some(left_table_desc.to_protobuf()), + table_desc: Some(left_table_desc.try_to_protobuf()?), output_col_idx: left_table .output_col_idx .iter() @@ -197,7 +201,7 @@ impl StreamNode for StreamDeltaJoin { .iter() .map(ColumnDesc::to_protobuf) .collect(), - table_desc: Some(right_table_desc.to_protobuf()), + table_desc: Some(right_table_desc.try_to_protobuf()?), output_col_idx: right_table .output_col_idx .iter() @@ -205,7 +209,7 @@ impl StreamNode for StreamDeltaJoin { .collect(), }), output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index 01694ddcaa492..395a32bc9be8a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -22,6 +22,7 @@ use super::utils::Distill; use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamExchange, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode}; +use crate::scheduler::SchedulerResult; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::Explain; @@ -84,7 +85,10 @@ impl StreamNode for StreamShare { } impl StreamShare { - pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> PbStreamNode { + pub fn adhoc_to_stream_prost( + &self, + state: &mut BuildFragmentGraphState, + ) -> SchedulerResult { let operator_id = self.base.id().0 as u32; match state.get_share_stream_node(operator_id) { @@ -96,7 +100,7 @@ impl StreamShare { .inputs() .into_iter() .map(|plan| plan.to_stream_prost(state)) - .collect(); + .try_collect()?; let stream_node = PbStreamNode { input, @@ -115,10 +119,10 @@ impl StreamShare { }; state.add_share_stream_node(operator_id, stream_node.clone()); - stream_node + Ok(stream_node) } - Some(stream_node) => stream_node.clone(), + Some(stream_node) => Ok(stream_node.clone()), } } } diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index ef224c5f96d00..8e7ae089284cb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -32,6 +32,7 @@ use crate::expr::{ExprRewriter, ExprVisitor, FunctionCall}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; use crate::optimizer::property::{Distribution, DistributionDisplay}; +use crate::scheduler::SchedulerResult; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::{Explain, TableCatalog}; @@ -227,7 +228,10 @@ impl StreamNode for StreamTableScan { } impl StreamTableScan { - pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> PbStreamNode { + pub fn adhoc_to_stream_prost( + &self, + state: &mut BuildFragmentGraphState, + ) -> SchedulerResult { use risingwave_pb::stream_plan::*; let stream_key = self @@ -275,7 +279,7 @@ impl StreamTableScan { // TODO: snapshot read of upstream mview let batch_plan_node = BatchPlanNode { - table_desc: Some(self.core.table_desc.to_protobuf()), + table_desc: Some(self.core.table_desc.try_to_protobuf()?), column_ids: upstream_column_ids.clone(), }; @@ -312,14 +316,14 @@ impl StreamTableScan { output_indices, upstream_column_ids, // The table desc used by backfill executor - table_desc: Some(self.core.table_desc.to_protobuf()), + table_desc: Some(self.core.table_desc.try_to_protobuf()?), state_table: Some(catalog), arrangement_table, rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit, ..Default::default() }); - PbStreamNode { + Ok(PbStreamNode { fields: self.schema().to_prost(), input: vec![ // Upstream updates @@ -347,7 +351,7 @@ impl StreamTableScan { operator_id: self.base.id().0 as u64, identity: self.distill_to_string(), append_only: self.append_only(), - } + }) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index de0350e166e3e..d919f3a968419 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -22,14 +22,15 @@ use super::generic::GenericPlanRef; use super::stream::prelude::*; use super::stream::StreamPlanRef; use super::utils::{childless_record, watermark_pretty, Distill}; -use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode}; +use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary}; use crate::expr::{Expr, ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::plan_tree_node::PlanTreeNodeUnary; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{ - EqJoinPredicate, EqJoinPredicateDisplay, StreamExchange, StreamTableScan, + EqJoinPredicate, EqJoinPredicateDisplay, StreamExchange, StreamTableScan, TryToStreamPb, }; +use crate::scheduler::SchedulerResult; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMappingRewriteExt; @@ -137,8 +138,11 @@ impl PlanTreeNodeBinary for StreamTemporalJoin { impl_plan_tree_node_for_binary! { StreamTemporalJoin } -impl StreamNode for StreamTemporalJoin { - fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody { +impl TryToStreamPb for StreamTemporalJoin { + fn try_to_stream_prost_body( + &self, + _state: &mut BuildFragmentGraphState, + ) -> SchedulerResult { let left_jk_indices = self.eq_join_predicate.left_eq_indexes(); let right_jk_indices = self.eq_join_predicate.right_eq_indexes(); let left_jk_indices_prost = left_jk_indices.iter().map(|idx| *idx as i32).collect_vec(); @@ -156,7 +160,7 @@ impl StreamNode for StreamTemporalJoin { .as_stream_table_scan() .expect("should be a stream table scan"); - NodeBody::TemporalJoin(TemporalJoinNode { + Ok(NodeBody::TemporalJoin(TemporalJoinNode { join_type: self.core.join_type as i32, left_key: left_jk_indices_prost, right_key: right_jk_indices_prost, @@ -167,9 +171,9 @@ impl StreamNode for StreamTemporalJoin { .as_expr_unless_true() .map(|x| x.to_expr_proto()), output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), - table_desc: Some(scan.core().table_desc.to_protobuf()), + table_desc: Some(scan.core().table_desc.try_to_protobuf()?), table_output_indices: scan.core().output_col_idx.iter().map(|&i| i as _).collect(), - }) + })) } } diff --git a/src/frontend/src/optimizer/plan_node/to_prost.rs b/src/frontend/src/optimizer/plan_node/to_prost.rs index 9a940022f9daf..ff86eef43b172 100644 --- a/src/frontend/src/optimizer/plan_node/to_prost.rs +++ b/src/frontend/src/optimizer/plan_node/to_prost.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use anyhow::anyhow; use paste::paste; use risingwave_pb::batch_plan::plan_node as pb_batch_node; use risingwave_pb::stream_plan::stream_node as pb_stream_node; @@ -21,23 +22,59 @@ use crate::{ for_all_plan_nodes, for_batch_plan_nodes, for_logical_plan_nodes, for_stream_plan_nodes, }; -pub trait ToPb: ToBatchPb + StreamNode {} +pub trait ToPb: TryToBatchPb + TryToStreamPb {} + +pub trait TryToBatchPb { + fn try_to_batch_prost_body(&self) -> SchedulerResult { + // Originally we panic in the following way + // panic!("convert into distributed is only allowed on batch plan") + Err(anyhow!( + "Node {} cannot be convert to batch node", + std::any::type_name::() + ) + .into()) + } +} pub trait ToBatchPb { - fn to_batch_prost_body(&self) -> pb_batch_node::NodeBody { - unimplemented!() + fn to_batch_prost_body(&self) -> pb_batch_node::NodeBody; +} + +impl TryToBatchPb for T { + fn try_to_batch_prost_body(&self) -> SchedulerResult { + Ok(self.to_batch_prost_body()) } } -pub trait StreamNode { - fn to_stream_prost_body( +pub trait TryToStreamPb { + fn try_to_stream_prost_body( &self, _state: &mut BuildFragmentGraphState, - ) -> pb_stream_node::NodeBody { - unimplemented!() + ) -> SchedulerResult { + // Originally we panic in the following way + // panic!("convert into distributed is only allowed on stream plan") + Err(anyhow!( + "Node {} cannot be convert to stream node", + std::any::type_name::() + ) + .into()) } } +impl TryToStreamPb for T { + fn try_to_stream_prost_body( + &self, + state: &mut BuildFragmentGraphState, + ) -> SchedulerResult { + Ok(self.to_stream_prost_body(state)) + } +} + +pub trait StreamNode { + fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) + -> pb_stream_node::NodeBody; +} + /// impl `ToPb` nodes which have impl `ToBatchPb` and `ToStreamPb`. macro_rules! impl_to_prost { ($( { $convention:ident, $name:ident }),*) => { @@ -51,11 +88,7 @@ for_all_plan_nodes! { impl_to_prost } macro_rules! ban_to_batch_prost { ($( { $convention:ident, $name:ident }),*) => { paste!{ - $(impl ToBatchPb for [<$convention $name>] { - fn to_batch_prost_body(&self) -> pb_batch_node::NodeBody { - panic!("convert into distributed is only allowed on batch plan") - } - })* + $(impl TryToBatchPb for [<$convention $name>] {})* } } } @@ -65,11 +98,7 @@ for_stream_plan_nodes! { ban_to_batch_prost } macro_rules! ban_to_stream_prost { ($( { $convention:ident, $name:ident }),*) => { paste!{ - $(impl StreamNode for [<$convention $name>] { - fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> pb_stream_node::NodeBody { - panic!("convert into distributed is only allowed on stream plan") - } - })* + $(impl TryToStreamPb for [<$convention $name>] {})* } } } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index e40282cbacf86..3ae326f55dfdd 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -101,16 +101,18 @@ impl Serialize for ExecutionPlanNode { } } -impl From for ExecutionPlanNode { - fn from(plan_node: PlanRef) -> Self { - Self { +impl TryFrom for ExecutionPlanNode { + type Error = SchedulerError; + + fn try_from(plan_node: PlanRef) -> Result { + Ok(Self { plan_node_id: plan_node.plan_base().id(), plan_node_type: plan_node.node_type(), - node: plan_node.to_batch_prost_body(), + node: plan_node.try_to_batch_prost_body()?, children: vec![], schema: plan_node.schema().to_prost(), source_stage_id: None, - } + }) } } @@ -836,7 +838,7 @@ impl BatchPlanFragmenter { self.visit_exchange(node.clone(), builder, parent_exec_node)?; } _ => { - let mut execution_plan_node = ExecutionPlanNode::from(node.clone()); + let mut execution_plan_node = ExecutionPlanNode::try_from(node.clone())?; for child in node.inputs() { self.visit_node(child, builder, Some(&mut execution_plan_node))?; @@ -858,7 +860,7 @@ impl BatchPlanFragmenter { builder: &mut QueryStageBuilder, parent_exec_node: Option<&mut ExecutionPlanNode>, ) -> SchedulerResult<()> { - let mut execution_plan_node = ExecutionPlanNode::from(node.clone()); + let mut execution_plan_node = ExecutionPlanNode::try_from(node.clone())?; let child_exchange_info = if let Some(parallelism) = builder.parallelism { Some(node.distribution().to_prost( parallelism, diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 291d72b6a8f22..344aa103deeb4 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -32,6 +32,7 @@ use risingwave_pb::stream_plan::{ use self::rewrite::build_delta_join_without_arrange; use crate::optimizer::plan_node::reorganize_elements_id; use crate::optimizer::PlanRef; +use crate::scheduler::SchedulerResult; /// The mutable state when building fragment graph. #[derive(Educe)] @@ -112,11 +113,11 @@ impl BuildFragmentGraphState { } } -pub fn build_graph(plan_node: PlanRef) -> StreamFragmentGraphProto { +pub fn build_graph(plan_node: PlanRef) -> SchedulerResult { let plan_node = reorganize_elements_id(plan_node); let mut state = BuildFragmentGraphState::default(); - let stream_node = plan_node.to_stream_prost(&mut state); + let stream_node = plan_node.to_stream_prost(&mut state)?; generate_fragment_graph(&mut state, stream_node).unwrap(); let mut fragment_graph = state.fragment_graph.to_protobuf(); fragment_graph.dependent_table_ids = state @@ -125,7 +126,7 @@ pub fn build_graph(plan_node: PlanRef) -> StreamFragmentGraphProto { .map(|id| id.table_id) .collect(); fragment_graph.table_ids_cnt = state.next_table_id; - fragment_graph + Ok(fragment_graph) } #[cfg(any())]