Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jul 22, 2024
1 parent 5a099ea commit 32b718e
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 56 deletions.
2 changes: 0 additions & 2 deletions src/expr/core/src/aggregate/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,6 @@ pub mod agg_kinds {
};
}
pub use ordered_set;

use crate::aggregate::AggKind;
}

impl AggKind {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ impl Binder {
"direct arg in `{}` must between 0.0 and 1.0",
kind
))
.into());
.into());
}
// note that the fraction can be NULL
*percentile = Literal::new(percentile_datum, DataType::Float64).into();
Expand All @@ -550,7 +550,7 @@ impl Binder {
"direct arg in `{}` must between 0.0 and 1.0",
kind
))
.into());
.into());
}
// note that the fraction can be NULL
*relative_error = Literal::new(relative_error_datum, DataType::Float64).into();
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ impl<PlanRef: GenericPlanRef> Agg<PlanRef> {
&& !self.agg_calls.is_empty()
&& self.agg_calls.iter().all(|call| {
let agg_kind_ok = !matches!(call.agg_kind, agg_kinds::simply_cannot_two_phase!());
let order_ok = matches!(call.agg_kind, agg_kinds::result_unaffected_by_order_by!() | AggKind::ApproxPercentile)
|| call.order_by.is_empty();
let order_ok = matches!(
call.agg_kind,
agg_kinds::result_unaffected_by_order_by!() | AggKind::ApproxPercentile
) || call.order_by.is_empty();
let distinct_ok =
matches!(call.agg_kind, agg_kinds::result_unaffected_by_distinct!())
|| !call.distinct;
Expand Down
50 changes: 22 additions & 28 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::expr::OrderByExpr;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::types::{DataType, Datum, ScalarImpl};
Expand All @@ -23,8 +22,8 @@ use risingwave_expr::aggregate::{agg_kinds, AggKind};
use super::generic::{self, Agg, GenericPlanRef, PlanAggCall, ProjectBuilder};
use super::utils::impl_distill_by_unit;
use super::{
BatchHashAgg, BatchSimpleAgg, ColPrunable, ExprRewritable, Logical, LogicalShare, PlanBase,
PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamHashAgg, StreamProject, StreamShare,
BatchHashAgg, BatchSimpleAgg, ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef,
PlanTreeNodeUnary, PredicatePushdown, StreamHashAgg, StreamProject, StreamShare,
StreamSimpleAgg, StreamStatelessSimpleAgg, ToBatch, ToStream,
};
use crate::error::{ErrorCode, Result, RwError};
Expand All @@ -43,7 +42,7 @@ use crate::optimizer::plan_node::{
};
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::utils::{
ColIndexMapping, ColIndexMappingRewriteExt, Condition, DynEq, GroupBy, IndexSet, Substitute,
ColIndexMapping, ColIndexMappingRewriteExt, Condition, GroupBy, IndexSet, Substitute,
};

pub struct SeparatedAggInfo {
Expand Down Expand Up @@ -225,11 +224,7 @@ impl LogicalAgg {
}

/// Generates distributed stream plan.
fn gen_dist_stream_agg_plan(
&self,
stream_input: PlanRef,
ctx: &mut ToStreamContext,
) -> Result<PlanRef> {
fn gen_dist_stream_agg_plan(&self, stream_input: PlanRef) -> Result<PlanRef> {
use super::stream::prelude::*;

let input_dist = stream_input.distribution();
Expand Down Expand Up @@ -340,24 +335,22 @@ impl LogicalAgg {
approx_percentile_agg_call: &PlanAggCall,
) -> PlanRef {
let local_approx_percentile =
StreamLocalApproxPercentile::new(input, &approx_percentile_agg_call);
let global_approx_percentile = StreamGlobalApproxPercentile::new(
local_approx_percentile.into(),
&approx_percentile_agg_call,
);
StreamLocalApproxPercentile::new(input, approx_percentile_agg_call);
let global_approx_percentile =
StreamGlobalApproxPercentile::new(local_approx_percentile.into());
global_approx_percentile.into()
}

/// If only 1 approx percentile, just return it.
/// Otherwise build a tree of approx percentile with KeyedMerge.
/// Otherwise build a tree of approx percentile with `KeyedMerge`.
/// e.g.
/// ApproxPercentile(col1, 0.5) as x,
/// ApproxPercentile(col2, 0.5) as y,
/// ApproxPercentile(col3, 0.5) as z
/// will be built as
/// KeyedMerge
/// `KeyedMerge`
/// / \
/// KeyedMerge z
/// `KeyedMerge` z
/// / \
/// x y
Expand All @@ -370,7 +363,7 @@ impl LogicalAgg {
.iter()
.map(|agg_call| self.build_approx_percentile_agg(input.clone(), agg_call))
.collect_vec();
assert!(approx_percentile_plans.len() >= 1);
assert!(!approx_percentile_plans.is_empty());
let mut iter = approx_percentile_plans.into_iter();
let mut acc = iter.next().unwrap();
for (current_size, plan) in iter.enumerate().map(|(i, p)| (i + 1, p)) {
Expand Down Expand Up @@ -656,17 +649,18 @@ impl LogicalAggBuilder {
_ => unreachable!(),
}
}
AggKind::ApproxPercentile
=>
{
AggKind::ApproxPercentile => {
if agg_call.order_by.sort_exprs[0].order_type == OrderType::descending() {
let prev_percentile = agg_call.direct_args[0].clone();
let new_percentile = 1.0 - prev_percentile.get_data().as_ref().unwrap().as_float64().into_inner();
let new_percentile = 1.0
- prev_percentile
.get_data()
.as_ref()
.unwrap()
.as_float64()
.into_inner();
let new_percentile = Some(ScalarImpl::Float64(new_percentile.into()));
let new_percentile = Literal::new(
new_percentile,
DataType::Float64,
);
let new_percentile = Literal::new(new_percentile, DataType::Float64);
let new_direct_args = vec![new_percentile, agg_call.direct_args[1].clone()];

let new_agg_call = AggCall {
Expand Down Expand Up @@ -1284,7 +1278,7 @@ impl ToStream for LogicalAgg {
return logical_dedup.to_stream(ctx);
}

let plan = self.gen_dist_stream_agg_plan(stream_input, ctx)?;
let plan = self.gen_dist_stream_agg_plan(stream_input)?;

let (plan, n_final_agg_calls) = if let Some(final_agg) = plan.as_stream_simple_agg() {
if eowc {
Expand All @@ -1304,7 +1298,7 @@ impl ToStream for LogicalAgg {
},
final_agg.agg_calls().len(),
)
} else if let Some(approx_percentile_agg) = plan.as_stream_global_approx_percentile() {
} else if let Some(_approx_percentile_agg) = plan.as_stream_global_approx_percentile() {
if eowc {
return Err(ErrorCode::InvalidInputSyntax(
"`EMIT ON WINDOW CLOSE` cannot be used for aggregation without `GROUP BY`"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody;

use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::{childless_record, watermark_pretty, Distill};
use crate::optimizer::plan_node::utils::{childless_record, Distill};
use crate::optimizer::plan_node::{
ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamHopWindow,
StreamKeyedMerge, StreamNode,
ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
};
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand All @@ -38,7 +37,7 @@ pub struct StreamGlobalApproxPercentile {
}

impl StreamGlobalApproxPercentile {
pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
pub fn new(input: PlanRef) -> Self {
let schema = Schema::new(vec![Field::new("approx_percentile", DataType::Float64)]);
let watermark_columns = FixedBitSet::with_capacity(1);
let base = PlanBase::new_stream(
Expand Down Expand Up @@ -94,5 +93,5 @@ impl ExprRewritable for StreamGlobalApproxPercentile {
}

impl ExprVisitable for StreamGlobalApproxPercentile {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {}
fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
}
12 changes: 5 additions & 7 deletions src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@ use risingwave_common::bail;
use risingwave_common::catalog::Schema;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::HopWindowNode;

use crate::error::Result;
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::{childless_record, Distill, IndicesDisplay};
use crate::optimizer::plan_node::utils::{childless_record, Distill};
use crate::optimizer::plan_node::{
ExprRewritable, PlanBase, PlanTreeNodeBinary, Stream, StreamHopWindow,
StreamLocalApproxPercentile, StreamNode,
ExprRewritable, PlanBase, PlanTreeNodeBinary, Stream, StreamNode,
};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;
Expand All @@ -54,10 +52,10 @@ impl StreamKeyedMerge {
) -> Result<Self> {
assert_eq!(lhs_mapping.target_size(), rhs_mapping.target_size());
let mut schema_fields = Vec::with_capacity(lhs_mapping.target_size());
let mut o2i_lhs = lhs_mapping
let o2i_lhs = lhs_mapping
.inverse()
.ok_or_else(|| anyhow!("lhs_mapping should be invertible"))?;
let mut o2i_rhs = rhs_mapping
let o2i_rhs = rhs_mapping
.inverse()
.ok_or_else(|| anyhow!("rhs_mapping should be invertible"))?;
for output_idx in 0..lhs_mapping.target_size() {
Expand Down Expand Up @@ -152,5 +150,5 @@ impl ExprRewritable for StreamKeyedMerge {
}

impl ExprVisitable for StreamKeyedMerge {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {}
fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,15 @@ use fixedbitset::FixedBitSet;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;

use crate::expr::{ExprRewriter, ExprVisitor, InputRef, InputRefDisplay, Literal};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::{
childless_record, watermark_pretty, Distill, IndicesDisplay,
};
use crate::optimizer::plan_node::utils::{childless_record, watermark_pretty, Distill};
use crate::optimizer::plan_node::{
ExprRewritable, PlanAggCall, PlanBase, PlanNode, PlanTreeNodeUnary, Stream,
StreamGlobalApproxPercentile, StreamHopWindow, StreamNode,
ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;
Expand Down Expand Up @@ -127,5 +123,5 @@ impl ExprRewritable for StreamLocalApproxPercentile {
}

impl ExprVisitable for StreamLocalApproxPercentile {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {}
fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
}
6 changes: 4 additions & 2 deletions src/frontend/src/optimizer/rule/distinct_agg_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ impl Rule for DistinctAggRule {
c.agg_kind
);
let agg_kind_ok = !matches!(c.agg_kind, agg_kinds::simply_cannot_two_phase!());
let order_ok = matches!(c.agg_kind, agg_kinds::result_unaffected_by_order_by!() | AggKind::ApproxPercentile)
|| c.order_by.is_empty();
let order_ok = matches!(
c.agg_kind,
agg_kinds::result_unaffected_by_order_by!() | AggKind::ApproxPercentile
) || c.order_by.is_empty();
agg_kind_ok && order_ok
}) {
tracing::warn!("DistinctAggRule: unsupported agg kind, fallback to backend impl");
Expand Down

0 comments on commit 32b718e

Please sign in to comment.