Skip to content

Commit

Permalink
clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Sep 25, 2023
1 parent 284cfdb commit a059513
Show file tree
Hide file tree
Showing 21 changed files with 48 additions and 90 deletions.
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,8 @@ pub(crate) fn derive_pk(
// Note(congyi): avoid pk duplication
let stream_key = input
.stream_key()
.expect(&format!(
"should always have a stream key on the top of the stream plan but not, plan: {}",
input.explain_to_string()
))
.unwrap_or_else(|| panic!("should always have a stream key on the top of the stream plan but not, plan: {}",
input.explain_to_string()))
.iter()
.copied()
.unique()
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,8 @@ impl LogicalAgg {
if *input_dist == Distribution::SomeShard && self.core.must_try_two_phase_agg() {
RequiredDist::shard_by_key(
stream_input.schema().len(),
stream_input.stream_key().expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
stream_input.explain_to_string()
)),
stream_input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
stream_input.explain_to_string())),
)
.enforce_if_not_satisfies(stream_input, &Order::any())?
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl LogicalApply {
let schema = join_core.schema();
let stream_key = join_core.stream_key();
let functional_dependency = match &stream_key {
Some(stream_key) => FunctionalDependencySet::with_key(schema.len(), &stream_key),
Some(stream_key) => FunctionalDependencySet::with_key(schema.len(), stream_key),
None => FunctionalDependencySet::new(schema.len()),
};
let (left, right, on, join_type, _output_indices) = join_core.decompose();
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/logical_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,8 @@ impl ToStream for LogicalHopWindow {
output_indices.extend(
input
.stream_key()
.expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
input.explain_to_string()
))
.unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
input.explain_to_string()))
.iter()
.cloned()
.filter(|i| i2o.try_map(*i).is_none()),
Expand Down
24 changes: 8 additions & 16 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1397,21 +1397,17 @@ impl ToStream for LogicalJoin {
// Add missing pk indices to the logical join
let mut left_to_add = left
.stream_key()
.expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
left.explain_to_string()
))
.unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
left.explain_to_string()))
.iter()
.cloned()
.filter(|i| l2o.try_map(*i).is_none())
.collect_vec();

let mut right_to_add = right
.stream_key()
.expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
right.explain_to_string()
))
.unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
right.explain_to_string()))
.iter()
.filter(|&&i| r2o.try_map(i).is_none())
.map(|&i| i + left_len)
Expand Down Expand Up @@ -1474,19 +1470,15 @@ impl ToStream for LogicalJoin {
let left_right_stream_keys = join_with_pk
.left()
.stream_key()
.expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
left.explain_to_string()
))
.unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
left.explain_to_string()))
.iter()
.map(|i| l2o.map(*i))
.chain(
join_with_pk
.right()
.stream_key() .expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
join_with_pk.right().explain_to_string()
))
.stream_key() .unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
join_with_pk.right().explain_to_string()))
.iter()
.map(|i| r2o.map(*i)),
)
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/logical_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,8 @@ impl ToStream for LogicalProject {
let (proj, out_col_change) = self.rewrite_with_input(input.clone(), input_col_change);

// Add missing columns of input_pk into the select list.
let input_pk = input.stream_key().expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
input.explain_to_string()
));
let input_pk = input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
input.explain_to_string()));
let i2o = proj.i2o_col_mapping();
let col_need_to_add = input_pk
.iter()
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/logical_project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,8 @@ impl ToStream for LogicalProjectSet {
self.rewrite_with_input(input.clone(), input_col_change);

// Add missing columns of input_pk into the select list.
let input_pk = input.stream_key().expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
input.explain_to_string()
));
let input_pk = input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
input.explain_to_string()));
let i2o = self.core.i2o_col_mapping();
let col_need_to_add = input_pk
.iter()
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/logical_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,8 @@ impl ToBatch for LogicalUnion {
impl ToStream for LogicalUnion {
fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
// TODO: use round robin distribution instead of using hash distribution of all inputs.
let dist = RequiredDist::hash_shard(self.base.stream_key().expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
PlanRef::from(self.clone()).explain_to_string()
)));
let dist = RequiredDist::hash_shard(self.base.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
PlanRef::from(self.clone()).explain_to_string())));
let new_inputs: Result<Vec<_>> = self
.inputs()
.iter()
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/optimizer/plan_node/logical_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::vec;

use risingwave_common::catalog::{Field, Schema, TableVersionId};

use risingwave_common::catalog::{TableVersionId};
use risingwave_common::error::Result;
use risingwave_common::types::DataType;


use super::utils::impl_distill_by_unit;
use super::{
Expand All @@ -28,7 +28,7 @@ use crate::expr::{ExprImpl, ExprRewriter};
use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
};
use crate::optimizer::property::FunctionalDependencySet;

use crate::utils::{ColIndexMapping, Condition};

/// [`LogicalUpdate`] iterates on input relation, set some columns, and inject update records into
Expand All @@ -43,7 +43,7 @@ pub struct LogicalUpdate {

impl From<generic::Update<PlanRef>> for LogicalUpdate {
fn from(core: generic::Update<PlanRef>) -> Self {
let ctx = core.input.ctx();
let _ctx = core.input.ctx();
let base = PlanBase::new_logical_with_core(&core);
Self { base, core }
}
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,10 +568,8 @@ impl dyn PlanNode {
operator_id: self.id().0 as _,
stream_key: self
.stream_key()
.expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
self.explain_myself_to_string()
))
.unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
self.explain_myself_to_string()))
.iter()
.map(|x| *x as u32)
.collect(),
Expand Down
4 changes: 1 addition & 3 deletions src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,9 +765,7 @@ pub fn to_stream_prost_body(
me.infer_internal_table_catalog(
input.schema(),
input.ctx(),
input.stream_key().expect(&format!(
"should always have a stream key in the stream plan but not"
)),
input.stream_key().expect("should always have a stream key in the stream plan but not"),
None,
)
.with_id(state.gen_table_id_wrapped())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,8 @@ impl StreamEowcOverWindow {
tbl_builder.add_order_column(order_key_index, OrderType::ascending());
order_cols.insert(order_key_index);
}
for idx in self.logical.input.stream_key().expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
self.logical.input.explain_to_string()
)) {
for idx in self.logical.input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
self.logical.input.explain_to_string())) {
if !order_cols.contains(idx) {
tbl_builder.add_order_column(*idx, OrderType::ascending());
order_cols.insert(*idx);
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,8 @@ impl StreamNode for StreamGroupTopN {
.infer_internal_table_catalog(
input.schema(),
input.ctx(),
input.stream_key().expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
input.explain_to_string()
)),
input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
input.explain_to_string())),
self.vnode_col_idx,
)
.with_id(state.gen_table_id_wrapped());
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,8 @@ impl StreamMaterialize {
// ensure the same pk will not shuffle to different node
RequiredDist::shard_by_key(
input.schema().len(),
input.stream_key().expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
input.explain_to_string()
)),
input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
input.explain_to_string())),
)
}
TableType::Index => {
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_over_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@ impl StreamOverWindow {
tbl_builder.add_order_column(o.column_index, o.order_type);
}
}
for &idx in self.logical.input.stream_key().expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
self.logical.input.explain_to_string()
)) {
for &idx in self.logical.input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
self.logical.input.explain_to_string())) {
if order_cols.insert(idx) {
tbl_builder.add_order_column(idx, OrderType::ascending());
}
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,8 @@ impl StreamShare {
operator_id: self.id().0 as _,
stream_key: self
.stream_key()
.expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
PlanRef::from(self.clone()).explain_to_string()
))
.unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
PlanRef::from(self.clone()).explain_to_string()))
.iter()
.map(|x| *x as u32)
.collect(),
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,8 @@ impl StreamSink {
}
_ => {
assert_matches!(user_distributed_by, RequiredDist::Any);
RequiredDist::shard_by_key(input.schema().len(), input.stream_key().expect(&format!(
"should always have a stream key on the top of the stream plan but not, plan: {}",
input.explain_to_string()
)))
RequiredDist::shard_by_key(input.schema().len(), input.stream_key().unwrap_or_else(|| panic!("should always have a stream key on the top of the stream plan but not, plan: {}",
input.explain_to_string())))
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,8 @@ impl StreamEowcSort {
}
}

for idx in self.input.stream_key().expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
self.input.explain_to_string()
)) {
for idx in self.input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
self.input.explain_to_string())) {
if !order_cols.contains(idx) {
tbl_builder.add_order_column(*idx, OrderType::ascending());
order_cols.insert(*idx);
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,8 @@ impl StreamTableScan {

let stream_key = self
.stream_key()
.expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
PlanRef::from(self.clone()).explain_to_string()
))
.unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
PlanRef::from(self.clone()).explain_to_string()))
.iter()
.map(|x| *x as u32)
.collect_vec();
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,8 @@ impl StreamNode for StreamTopN {
.infer_internal_table_catalog(
input.schema(),
input.ctx(),
input.stream_key().expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
input.explain_to_string()
)),
input.stream_key().unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}",
input.explain_to_string())),
None,
)
.with_id(state.gen_table_id_wrapped())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl CardinalityVisitor {
input_card: Cardinality,
eq_set: HashSet<usize>,
) -> Cardinality {
let mut unique_keys: Vec<HashSet<_>> = if let Some(stream_key) = input.stream_key() {
let mut unique_keys: Vec<HashSet<_>> = if let Some(_stream_key) = input.stream_key() {
vec![input.stream_key().unwrap().iter().copied().collect()]
} else {
vec![]
Expand Down

0 comments on commit a059513

Please sign in to comment.