diff --git a/src/expr/core/src/aggregate/def.rs b/src/expr/core/src/aggregate/def.rs index b807d2b88b5e0..de5583e14e49e 100644 --- a/src/expr/core/src/aggregate/def.rs +++ b/src/expr/core/src/aggregate/def.rs @@ -471,8 +471,6 @@ pub mod agg_kinds { }; } pub use ordered_set; - - use crate::aggregate::AggKind; } impl AggKind { diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 690a648e2d068..0dc2c16ccd4e3 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -523,7 +523,7 @@ impl Binder { "direct arg in `{}` must between 0.0 and 1.0", kind )) - .into()); + .into()); } // note that the fraction can be NULL *percentile = Literal::new(percentile_datum, DataType::Float64).into(); @@ -550,7 +550,7 @@ impl Binder { "direct arg in `{}` must between 0.0 and 1.0", kind )) - .into()); + .into()); } // note that the fraction can be NULL *relative_error = Literal::new(relative_error_datum, DataType::Float64).into(); diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index e669b24ff9a23..bc7148af25a20 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -107,8 +107,10 @@ impl Agg { && !self.agg_calls.is_empty() && self.agg_calls.iter().all(|call| { let agg_kind_ok = !matches!(call.agg_kind, agg_kinds::simply_cannot_two_phase!()); - let order_ok = matches!(call.agg_kind, agg_kinds::result_unaffected_by_order_by!() | AggKind::ApproxPercentile) - || call.order_by.is_empty(); + let order_ok = matches!( + call.agg_kind, + agg_kinds::result_unaffected_by_order_by!() | AggKind::ApproxPercentile + ) || call.order_by.is_empty(); let distinct_ok = matches!(call.agg_kind, agg_kinds::result_unaffected_by_distinct!()) || !call.distinct; diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index 139b53c9f4181..f68cf35fb64a7 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::expr::OrderByExpr; use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::types::{DataType, Datum, ScalarImpl}; @@ -23,8 +22,8 @@ use risingwave_expr::aggregate::{agg_kinds, AggKind}; use super::generic::{self, Agg, GenericPlanRef, PlanAggCall, ProjectBuilder}; use super::utils::impl_distill_by_unit; use super::{ - BatchHashAgg, BatchSimpleAgg, ColPrunable, ExprRewritable, Logical, LogicalShare, PlanBase, - PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamHashAgg, StreamProject, StreamShare, + BatchHashAgg, BatchSimpleAgg, ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef, + PlanTreeNodeUnary, PredicatePushdown, StreamHashAgg, StreamProject, StreamShare, StreamSimpleAgg, StreamStatelessSimpleAgg, ToBatch, ToStream, }; use crate::error::{ErrorCode, Result, RwError}; @@ -43,7 +42,7 @@ use crate::optimizer::plan_node::{ }; use crate::optimizer::property::{Distribution, Order, RequiredDist}; use crate::utils::{ - ColIndexMapping, ColIndexMappingRewriteExt, Condition, DynEq, GroupBy, IndexSet, Substitute, + ColIndexMapping, ColIndexMappingRewriteExt, Condition, GroupBy, IndexSet, Substitute, }; pub struct SeparatedAggInfo { @@ -225,11 +224,7 @@ impl LogicalAgg { } /// Generates distributed stream plan. - fn gen_dist_stream_agg_plan( - &self, - stream_input: PlanRef, - ctx: &mut ToStreamContext, - ) -> Result { + fn gen_dist_stream_agg_plan(&self, stream_input: PlanRef) -> Result { use super::stream::prelude::*; let input_dist = stream_input.distribution(); @@ -340,24 +335,22 @@ impl LogicalAgg { approx_percentile_agg_call: &PlanAggCall, ) -> PlanRef { let local_approx_percentile = - StreamLocalApproxPercentile::new(input, &approx_percentile_agg_call); - let global_approx_percentile = StreamGlobalApproxPercentile::new( - local_approx_percentile.into(), - &approx_percentile_agg_call, - ); + StreamLocalApproxPercentile::new(input, approx_percentile_agg_call); + let global_approx_percentile = + StreamGlobalApproxPercentile::new(local_approx_percentile.into()); global_approx_percentile.into() } /// If only 1 approx percentile, just return it. - /// Otherwise build a tree of approx percentile with KeyedMerge. + /// Otherwise build a tree of approx percentile with `KeyedMerge`. /// e.g. /// ApproxPercentile(col1, 0.5) as x, /// ApproxPercentile(col2, 0.5) as y, /// ApproxPercentile(col3, 0.5) as z /// will be built as - /// KeyedMerge + /// `KeyedMerge` /// / \ - /// KeyedMerge z + /// `KeyedMerge` z /// / \ /// x y @@ -370,7 +363,7 @@ impl LogicalAgg { .iter() .map(|agg_call| self.build_approx_percentile_agg(input.clone(), agg_call)) .collect_vec(); - assert!(approx_percentile_plans.len() >= 1); + assert!(!approx_percentile_plans.is_empty()); let mut iter = approx_percentile_plans.into_iter(); let mut acc = iter.next().unwrap(); for (current_size, plan) in iter.enumerate().map(|(i, p)| (i + 1, p)) { @@ -656,17 +649,18 @@ impl LogicalAggBuilder { _ => unreachable!(), } } - AggKind::ApproxPercentile - => - { + AggKind::ApproxPercentile => { if agg_call.order_by.sort_exprs[0].order_type == OrderType::descending() { let prev_percentile = agg_call.direct_args[0].clone(); - let new_percentile = 1.0 - prev_percentile.get_data().as_ref().unwrap().as_float64().into_inner(); + let new_percentile = 1.0 + - prev_percentile + .get_data() + .as_ref() + .unwrap() + .as_float64() + .into_inner(); let new_percentile = Some(ScalarImpl::Float64(new_percentile.into())); - let new_percentile = Literal::new( - new_percentile, - DataType::Float64, - ); + let new_percentile = Literal::new(new_percentile, DataType::Float64); let new_direct_args = vec![new_percentile, agg_call.direct_args[1].clone()]; let new_agg_call = AggCall { @@ -1284,7 +1278,7 @@ impl ToStream for LogicalAgg { return logical_dedup.to_stream(ctx); } - let plan = self.gen_dist_stream_agg_plan(stream_input, ctx)?; + let plan = self.gen_dist_stream_agg_plan(stream_input)?; let (plan, n_final_agg_calls) = if let Some(final_agg) = plan.as_stream_simple_agg() { if eowc { @@ -1304,7 +1298,7 @@ impl ToStream for LogicalAgg { }, final_agg.agg_calls().len(), ) - } else if let Some(approx_percentile_agg) = plan.as_stream_global_approx_percentile() { + } else if let Some(_approx_percentile_agg) = plan.as_stream_global_approx_percentile() { if eowc { return Err(ErrorCode::InvalidInputSyntax( "`EMIT ON WINDOW CLOSE` cannot be used for aggregation without `GROUP BY`" diff --git a/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs b/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs index 11d2ccdcb40e3..caab6da90890c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs +++ b/src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs @@ -20,12 +20,11 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody; use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::stream::StreamPlanRef; -use crate::optimizer::plan_node::utils::{childless_record, watermark_pretty, Distill}; +use crate::optimizer::plan_node::utils::{childless_record, Distill}; use crate::optimizer::plan_node::{ - ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamHopWindow, - StreamKeyedMerge, StreamNode, + ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, }; use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -38,7 +37,7 @@ pub struct StreamGlobalApproxPercentile { } impl StreamGlobalApproxPercentile { - pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self { + pub fn new(input: PlanRef) -> Self { let schema = Schema::new(vec![Field::new("approx_percentile", DataType::Float64)]); let watermark_columns = FixedBitSet::with_capacity(1); let base = PlanBase::new_stream( @@ -94,5 +93,5 @@ impl ExprRewritable for StreamGlobalApproxPercentile { } impl ExprVisitable for StreamGlobalApproxPercentile { - fn visit_exprs(&self, v: &mut dyn ExprVisitor) {} + fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {} } diff --git a/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs b/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs index b2116ad2a21d2..6e6826ef6a44e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs +++ b/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs @@ -19,17 +19,15 @@ use risingwave_common::bail; use risingwave_common::catalog::Schema; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use risingwave_pb::stream_plan::HopWindowNode; use crate::error::Result; use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef}; use crate::optimizer::plan_node::stream::StreamPlanRef; -use crate::optimizer::plan_node::utils::{childless_record, Distill, IndicesDisplay}; +use crate::optimizer::plan_node::utils::{childless_record, Distill}; use crate::optimizer::plan_node::{ - ExprRewritable, PlanBase, PlanTreeNodeBinary, Stream, StreamHopWindow, - StreamLocalApproxPercentile, StreamNode, + ExprRewritable, PlanBase, PlanTreeNodeBinary, Stream, StreamNode, }; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::PlanRef; @@ -54,10 +52,10 @@ impl StreamKeyedMerge { ) -> Result { assert_eq!(lhs_mapping.target_size(), rhs_mapping.target_size()); let mut schema_fields = Vec::with_capacity(lhs_mapping.target_size()); - let mut o2i_lhs = lhs_mapping + let o2i_lhs = lhs_mapping .inverse() .ok_or_else(|| anyhow!("lhs_mapping should be invertible"))?; - let mut o2i_rhs = rhs_mapping + let o2i_rhs = rhs_mapping .inverse() .ok_or_else(|| anyhow!("rhs_mapping should be invertible"))?; for output_idx in 0..lhs_mapping.target_size() { @@ -152,5 +150,5 @@ impl ExprRewritable for StreamKeyedMerge { } impl ExprVisitable for StreamKeyedMerge { - fn visit_exprs(&self, v: &mut dyn ExprVisitor) {} + fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {} } diff --git a/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs b/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs index 21a44677d9360..4f312524ee409 100644 --- a/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs +++ b/src/frontend/src/optimizer/plan_node/stream_local_approx_percentile.rs @@ -16,19 +16,15 @@ use fixedbitset::FixedBitSet; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; -use risingwave_common::util::sort_util::OrderType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use crate::expr::{ExprRewriter, ExprVisitor, InputRef, InputRefDisplay, Literal}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef}; use crate::optimizer::plan_node::stream::StreamPlanRef; -use crate::optimizer::plan_node::utils::{ - childless_record, watermark_pretty, Distill, IndicesDisplay, -}; +use crate::optimizer::plan_node::utils::{childless_record, watermark_pretty, Distill}; use crate::optimizer::plan_node::{ - ExprRewritable, PlanAggCall, PlanBase, PlanNode, PlanTreeNodeUnary, Stream, - StreamGlobalApproxPercentile, StreamHopWindow, StreamNode, + ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode, }; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::PlanRef; @@ -127,5 +123,5 @@ impl ExprRewritable for StreamLocalApproxPercentile { } impl ExprVisitable for StreamLocalApproxPercentile { - fn visit_exprs(&self, v: &mut dyn ExprVisitor) {} + fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {} } diff --git a/src/frontend/src/optimizer/rule/distinct_agg_rule.rs b/src/frontend/src/optimizer/rule/distinct_agg_rule.rs index ca32ef09504d9..9b55bfa91695c 100644 --- a/src/frontend/src/optimizer/rule/distinct_agg_rule.rs +++ b/src/frontend/src/optimizer/rule/distinct_agg_rule.rs @@ -57,8 +57,10 @@ impl Rule for DistinctAggRule { c.agg_kind ); let agg_kind_ok = !matches!(c.agg_kind, agg_kinds::simply_cannot_two_phase!()); - let order_ok = matches!(c.agg_kind, agg_kinds::result_unaffected_by_order_by!() | AggKind::ApproxPercentile) - || c.order_by.is_empty(); + let order_ok = matches!( + c.agg_kind, + agg_kinds::result_unaffected_by_order_by!() | AggKind::ApproxPercentile + ) || c.order_by.is_empty(); agg_kind_ok && order_ok }) { tracing::warn!("DistinctAggRule: unsupported agg kind, fallback to backend impl");