Skip to content

Commit

Permalink
feat(frontend): add stateless two-phase ApproxPercentile (#17469)
Browse files Browse the repository at this point in the history
Co-authored-by: Eric Fu <[email protected]>
  • Loading branch information
kwannoel and fuyufjh authored Jul 25, 2024
1 parent ffc6d4a commit 6834de8
Show file tree
Hide file tree
Showing 19 changed files with 1,261 additions and 51 deletions.
2 changes: 2 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ message AggCall {
LAST_VALUE = 25;
GROUPING = 26;
INTERNAL_LAST_SEEN_VALUE = 27;
APPROX_PERCENTILE = 28;

// user defined aggregate function
USER_DEFINED = 100;
// wraps a scalar function that takes a list as input as an aggregate function.
Expand Down
10 changes: 8 additions & 2 deletions src/expr/core/src/aggregate/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,10 @@ pub mod agg_kinds {
| PbAggKind::StddevSamp
| PbAggKind::VarPop
| PbAggKind::VarSamp
| PbAggKind::Grouping,
| PbAggKind::Grouping
// ApproxPercentile always uses custom agg executors,
// rather than an aggregation operator
| PbAggKind::ApproxPercentile
)
};
}
Expand Down Expand Up @@ -443,7 +446,10 @@ pub mod agg_kinds {
macro_rules! ordered_set {
() => {
AggKind::Builtin(
PbAggKind::PercentileCont | PbAggKind::PercentileDisc | PbAggKind::Mode,
PbAggKind::PercentileCont
| PbAggKind::PercentileDisc
| PbAggKind::Mode
| PbAggKind::ApproxPercentile,
)
};
}
Expand Down
67 changes: 67 additions & 0 deletions src/expr/impl/src/aggregate/approx_percentile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Range;

use risingwave_common::array::*;
use risingwave_common::types::*;
use risingwave_common_estimate_size::EstimateSize;
use risingwave_expr::aggregate::{AggCall, AggStateDyn, AggregateFunction, AggregateState};
use risingwave_expr::{build_aggregate, Result};

#[build_aggregate("approx_percentile(float8) -> float8")]
fn build(agg: &AggCall) -> Result<Box<dyn AggregateFunction>> {
let fraction = agg.direct_args[0]
.literal()
.map(|x| (*x.as_float64()).into());
Ok(Box::new(ApproxPercentile { fraction }))
}

#[allow(dead_code)]
pub struct ApproxPercentile {
fraction: Option<f64>,
}

#[derive(Debug, Default, EstimateSize)]
struct State(Vec<f64>);

impl AggStateDyn for State {}

#[async_trait::async_trait]
impl AggregateFunction for ApproxPercentile {
fn return_type(&self) -> DataType {
DataType::Float64
}

fn create_state(&self) -> Result<AggregateState> {
todo!()
}

async fn update(&self, _state: &mut AggregateState, _input: &StreamChunk) -> Result<()> {
todo!()
}

async fn update_range(
&self,
_state: &mut AggregateState,
_input: &StreamChunk,
_range: Range<usize>,
) -> Result<()> {
todo!()
}

async fn get_result(&self, _state: &AggregateState) -> Result<Datum> {
todo!()
}
}
1 change: 1 addition & 0 deletions src/expr/impl/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

mod approx_count_distinct;
mod approx_percentile;
mod array_agg;
mod bit_and;
mod bit_or;
Expand Down
56 changes: 56 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1000,3 +1000,59 @@
expected_outputs:
- batch_plan
- stream_plan
- name: test duplicate agg
sql: |
CREATE TABLE t (v1 int);
SELECT sum(v1) as x, count(v1) as y, sum(v1) as z, count(v1) as w from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile alone
sql: |
CREATE TABLE t (v1 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile with other simple aggs
sql: |
CREATE TABLE t (v1 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1), sum(v1) from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile with other simple aggs (sum, count)
sql: |
CREATE TABLE t (v1 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1), sum(v1) as s2, count(v1) from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile with duplicate approx_percentile
sql: |
CREATE TABLE t (v1 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as y from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile with different approx_percentile
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile with different approx_percentile interleaved with stateless simple aggs
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) as s2, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile with descending order
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.2, 0.01) WITHIN GROUP (order by v1 desc) from t;
expected_outputs:
- logical_plan
- stream_plan
Loading

0 comments on commit 6834de8

Please sign in to comment.