diff --git a/src/frontend/src/optimizer/plan_node/derive.rs b/src/frontend/src/optimizer/plan_node/derive.rs index 27f67ad243bce..3e40930d78f88 100644 --- a/src/frontend/src/optimizer/plan_node/derive.rs +++ b/src/frontend/src/optimizer/plan_node/derive.rs @@ -85,10 +85,8 @@ pub(crate) fn derive_pk( // Note(congyi): avoid pk duplication let stream_key = input .stream_key() - .expect(&format!( - "should always have a stream key on the top of the stream plan but not, plan: {}", - input.explain_to_string() - )) + .unwrap_or_else(|| panic!("should always have a stream key on the top of the stream plan but not, plan: {}", + input.explain_to_string())) .iter() .copied() .unique() diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index 8de03eb3a19c5..294735bfbdf26 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -225,10 +225,8 @@ impl LogicalAgg { if *input_dist == Distribution::SomeShard && self.core.must_try_two_phase_agg() { RequiredDist::shard_by_key( stream_input.schema().len(), - stream_input.stream_key().expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - stream_input.explain_to_string() - )), + stream_input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + stream_input.explain_to_string())), ) .enforce_if_not_satisfies(stream_input, &Order::any())? } else { diff --git a/src/frontend/src/optimizer/plan_node/logical_apply.rs b/src/frontend/src/optimizer/plan_node/logical_apply.rs index 62559d14d2a8b..7640f093fc933 100644 --- a/src/frontend/src/optimizer/plan_node/logical_apply.rs +++ b/src/frontend/src/optimizer/plan_node/logical_apply.rs @@ -88,7 +88,7 @@ impl LogicalApply { let schema = join_core.schema(); let stream_key = join_core.stream_key(); let functional_dependency = match &stream_key { - Some(stream_key) => FunctionalDependencySet::with_key(schema.len(), &stream_key), + Some(stream_key) => FunctionalDependencySet::with_key(schema.len(), stream_key), None => FunctionalDependencySet::new(schema.len()), }; let (left, right, on, join_type, _output_indices) = join_core.decompose(); diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index 99113371f0224..2d4113c4288e9 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -335,10 +335,8 @@ impl ToStream for LogicalHopWindow { output_indices.extend( input .stream_key() - .expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - input.explain_to_string() - )) + .unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + input.explain_to_string())) .iter() .cloned() .filter(|i| i2o.try_map(*i).is_none()), diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index b2939ce666cb0..be9f739b920d4 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -1397,10 +1397,8 @@ impl ToStream for LogicalJoin { // Add missing pk indices to the logical join let mut left_to_add = left .stream_key() - .expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - left.explain_to_string() - )) + .unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + left.explain_to_string())) .iter() .cloned() .filter(|i| l2o.try_map(*i).is_none()) @@ -1408,10 +1406,8 @@ impl ToStream for LogicalJoin { let mut right_to_add = right .stream_key() - .expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - right.explain_to_string() - )) + .unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + right.explain_to_string())) .iter() .filter(|&&i| r2o.try_map(i).is_none()) .map(|&i| i + left_len) @@ -1474,19 +1470,15 @@ impl ToStream for LogicalJoin { let left_right_stream_keys = join_with_pk .left() .stream_key() - .expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - left.explain_to_string() - )) + .unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + left.explain_to_string())) .iter() .map(|i| l2o.map(*i)) .chain( join_with_pk .right() - .stream_key() .expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - join_with_pk.right().explain_to_string() - )) + .stream_key() .unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + join_with_pk.right().explain_to_string())) .iter() .map(|i| r2o.map(*i)), ) diff --git a/src/frontend/src/optimizer/plan_node/logical_project.rs b/src/frontend/src/optimizer/plan_node/logical_project.rs index fbba7bc837e87..8005f8916c950 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project.rs @@ -265,10 +265,8 @@ impl ToStream for LogicalProject { let (proj, out_col_change) = self.rewrite_with_input(input.clone(), input_col_change); // Add missing columns of input_pk into the select list. - let input_pk = input.stream_key().expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - input.explain_to_string() - )); + let input_pk = input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + input.explain_to_string())); let i2o = proj.i2o_col_mapping(); let col_need_to_add = input_pk .iter() diff --git a/src/frontend/src/optimizer/plan_node/logical_project_set.rs b/src/frontend/src/optimizer/plan_node/logical_project_set.rs index 93347026c6364..ebb7f7453fbf1 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project_set.rs @@ -364,10 +364,8 @@ impl ToStream for LogicalProjectSet { self.rewrite_with_input(input.clone(), input_col_change); // Add missing columns of input_pk into the select list. - let input_pk = input.stream_key().expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - input.explain_to_string() - )); + let input_pk = input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + input.explain_to_string())); let i2o = self.core.i2o_col_mapping(); let col_need_to_add = input_pk .iter() diff --git a/src/frontend/src/optimizer/plan_node/logical_union.rs b/src/frontend/src/optimizer/plan_node/logical_union.rs index 22b5292e05c91..b6223148ffad7 100644 --- a/src/frontend/src/optimizer/plan_node/logical_union.rs +++ b/src/frontend/src/optimizer/plan_node/logical_union.rs @@ -140,10 +140,8 @@ impl ToBatch for LogicalUnion { impl ToStream for LogicalUnion { fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { // TODO: use round robin distribution instead of using hash distribution of all inputs. - let dist = RequiredDist::hash_shard(self.base.stream_key().expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - PlanRef::from(self.clone()).explain_to_string() - ))); + let dist = RequiredDist::hash_shard(self.base.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + PlanRef::from(self.clone()).explain_to_string()))); let new_inputs: Result> = self .inputs() .iter() diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs index 752a2c3b997c5..0436b3bb199ba 100644 --- a/src/frontend/src/optimizer/plan_node/logical_update.rs +++ b/src/frontend/src/optimizer/plan_node/logical_update.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::vec; -use risingwave_common::catalog::{Field, Schema, TableVersionId}; + +use risingwave_common::catalog::{TableVersionId}; use risingwave_common::error::Result; -use risingwave_common::types::DataType; + use super::utils::impl_distill_by_unit; use super::{ @@ -28,7 +28,7 @@ use crate::expr::{ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; -use crate::optimizer::property::FunctionalDependencySet; + use crate::utils::{ColIndexMapping, Condition}; /// [`LogicalUpdate`] iterates on input relation, set some columns, and inject update records into @@ -43,7 +43,7 @@ pub struct LogicalUpdate { impl From> for LogicalUpdate { fn from(core: generic::Update) -> Self { - let ctx = core.input.ctx(); + let _ctx = core.input.ctx(); let base = PlanBase::new_logical_with_core(&core); Self { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 6cedfea3c0579..b0f14ceeac29c 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -568,10 +568,8 @@ impl dyn PlanNode { operator_id: self.id().0 as _, stream_key: self .stream_key() - .expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - self.explain_myself_to_string() - )) + .unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + self.explain_myself_to_string())) .iter() .map(|x| *x as u32) .collect(), diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 5a8db8cd07994..9a32a0bf01d3b 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -765,9 +765,7 @@ pub fn to_stream_prost_body( me.infer_internal_table_catalog( input.schema(), input.ctx(), - input.stream_key().expect(&format!( - "should always have a stream key in the stream plan but not" - )), + input.stream_key().expect("should always have a stream key in the stream plan but not"), None, ) .with_id(state.gen_table_id_wrapped()) diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs index 9840b0a633f76..444054d4ba900 100644 --- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs @@ -101,10 +101,8 @@ impl StreamEowcOverWindow { tbl_builder.add_order_column(order_key_index, OrderType::ascending()); order_cols.insert(order_key_index); } - for idx in self.logical.input.stream_key().expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - self.logical.input.explain_to_string() - )) { + for idx in self.logical.input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + self.logical.input.explain_to_string())) { if !order_cols.contains(idx) { tbl_builder.add_order_column(*idx, OrderType::ascending()); order_cols.insert(*idx); diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index bb2027b419d91..e8d5d4a81e827 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -93,10 +93,8 @@ impl StreamNode for StreamGroupTopN { .infer_internal_table_catalog( input.schema(), input.ctx(), - input.stream_key().expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - input.explain_to_string() - )), + input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + input.explain_to_string())), self.vnode_col_idx, ) .with_id(state.gen_table_id_wrapped()); diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 29bdb66c9066d..8504ded46cdae 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -143,10 +143,8 @@ impl StreamMaterialize { // ensure the same pk will not shuffle to different node RequiredDist::shard_by_key( input.schema().len(), - input.stream_key().expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - input.explain_to_string() - )), + input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + input.explain_to_string())), ) } TableType::Index => { diff --git a/src/frontend/src/optimizer/plan_node/stream_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_over_window.rs index d7328d51cf626..d555883abfd63 100644 --- a/src/frontend/src/optimizer/plan_node/stream_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_over_window.rs @@ -68,10 +68,8 @@ impl StreamOverWindow { tbl_builder.add_order_column(o.column_index, o.order_type); } } - for &idx in self.logical.input.stream_key().expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - self.logical.input.explain_to_string() - )) { + for &idx in self.logical.input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + self.logical.input.explain_to_string())) { if order_cols.insert(idx) { tbl_builder.add_order_column(idx, OrderType::ascending()); } diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index 12f57b69cf3ea..969527fa69702 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -99,10 +99,8 @@ impl StreamShare { operator_id: self.id().0 as _, stream_key: self .stream_key() - .expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - PlanRef::from(self.clone()).explain_to_string() - )) + .unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + PlanRef::from(self.clone()).explain_to_string())) .iter() .map(|x| *x as u32) .collect(), diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index f37d0de4cd156..5cda66ddd3d72 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -138,10 +138,8 @@ impl StreamSink { } _ => { assert_matches!(user_distributed_by, RequiredDist::Any); - RequiredDist::shard_by_key(input.schema().len(), input.stream_key().expect(&format!( - "should always have a stream key on the top of the stream plan but not, plan: {}", - input.explain_to_string() - ))) + RequiredDist::shard_by_key(input.schema().len(), input.stream_key().unwrap_or_else(|| panic!("should always have a stream key on the top of the stream plan but not, plan: {}", + input.explain_to_string()))) } } } diff --git a/src/frontend/src/optimizer/plan_node/stream_sort.rs b/src/frontend/src/optimizer/plan_node/stream_sort.rs index cad51e482adbb..2ec9a5fb9f7b1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sort.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sort.rs @@ -92,10 +92,8 @@ impl StreamEowcSort { } } - for idx in self.input.stream_key().expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - self.input.explain_to_string() - )) { + for idx in self.input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + self.input.explain_to_string())) { if !order_cols.contains(idx) { tbl_builder.add_order_column(*idx, OrderType::ascending()); order_cols.insert(*idx); diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 2fdcc76377367..031d5d6f88bb2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -229,10 +229,8 @@ impl StreamTableScan { let stream_key = self .stream_key() - .expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - PlanRef::from(self.clone()).explain_to_string() - )) + .unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + PlanRef::from(self.clone()).explain_to_string())) .iter() .map(|x| *x as u32) .collect_vec(); diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index 807483b75bd85..c2d2382c4071f 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -110,10 +110,8 @@ impl StreamNode for StreamTopN { .infer_internal_table_catalog( input.schema(), input.ctx(), - input.stream_key().expect(&format!( - "should always have a stream key in the stream plan but not, sub plan: {}", - input.explain_to_string() - )), + input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + input.explain_to_string())), None, ) .with_id(state.gen_table_id_wrapped()) diff --git a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs index 76d0debf1fb64..5505d0fe680fc 100644 --- a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs +++ b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs @@ -36,7 +36,7 @@ impl CardinalityVisitor { input_card: Cardinality, eq_set: HashSet, ) -> Cardinality { - let mut unique_keys: Vec> = if let Some(stream_key) = input.stream_key() { + let mut unique_keys: Vec> = if let Some(_stream_key) = input.stream_key() { vec![input.stream_key().unwrap().iter().copied().collect()] } else { vec![]