Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): add stateless two-phase ApproxPercentile #17469

Merged
merged 33 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d4af542
add ApproxPercentile AggKind
kwannoel Jun 24, 2024
28afa59
add plan node for two-phase simple agg
kwannoel Jul 18, 2024
c90b89a
pass binder test
kwannoel Jul 18, 2024
6cb3678
expose function direct_args
kwannoel Jul 18, 2024
732238c
add planner test case
kwannoel Jul 19, 2024
dcc5918
add logical project
kwannoel Jul 20, 2024
1369c9b
extract agg calls
kwannoel Jul 21, 2024
a55c666
implement local approx percentile core parts
kwannoel Jul 21, 2024
84b04b2
fix local approx percentile schema
kwannoel Jul 21, 2024
477c5fc
handle global approx percentile
kwannoel Jul 21, 2024
f4acd01
implement stream keyed merge
kwannoel Jul 21, 2024
1a5d098
fix stream share
kwannoel Jul 21, 2024
3f6b9f7
test case of approx_percentile alone
kwannoel Jul 21, 2024
85adca2
add tests for stateless normal agg + approx percentile agg, fix distr…
kwannoel Jul 22, 2024
a12e333
fix agg key merge schema
kwannoel Jul 22, 2024
852dada
fix global approx percentile stream key
kwannoel Jul 22, 2024
c9a9b05
handle multiple approx percentile
kwannoel Jul 22, 2024
f4b6586
handle descending case
kwannoel Jul 22, 2024
965ae19
fmt
kwannoel Jul 22, 2024
556ea55
remove wrong example
kwannoel Jul 22, 2024
0abeac1
reuse functionality
kwannoel Jul 22, 2024
436c253
cleanup two-phase approx percentile
kwannoel Jul 22, 2024
a2dfaae
remove resolved comments
kwannoel Jul 22, 2024
c3eb3a1
resolve comments
kwannoel Jul 22, 2024
8b836fc
fmt
kwannoel Jul 22, 2024
32f6301
Update src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs
kwannoel Jul 24, 2024
c20bf2c
fix conflicts
kwannoel Jul 24, 2024
6821023
add quantile and relative error to global approx percentile
kwannoel Jul 24, 2024
4af937f
docs for stream keyed merge
kwannoel Jul 24, 2024
6cf0dd1
fill with todos for agg framework
kwannoel Jul 24, 2024
1dd7c2b
fix tests
kwannoel Jul 24, 2024
1e195fc
refactor SeparatedAggInfo + reuse Field::with_name
kwannoel Jul 24, 2024
5e816a1
fix
kwannoel Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ message AggCall {
LAST_VALUE = 25;
GROUPING = 26;
INTERNAL_LAST_SEEN_VALUE = 27;
APPROX_PERCENTILE = 28;

// user defined aggregate function
USER_DEFINED = 100;
// wraps a scalar function that takes a list as input as an aggregate function.
Expand Down
9 changes: 9 additions & 0 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ impl Field {
name: self.name.to_string(),
}
}

pub fn new(name: impl Into<String>, data_type: DataType) -> Self {
Self {
data_type,
name: name.into(),
sub_fields: vec![],
type_name: String::new(),
}
}
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
}

impl From<&ColumnDesc> for Field {
Expand Down
10 changes: 8 additions & 2 deletions src/expr/core/src/aggregate/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,10 @@ pub mod agg_kinds {
| PbAggKind::StddevSamp
| PbAggKind::VarPop
| PbAggKind::VarSamp
| PbAggKind::Grouping,
| PbAggKind::Grouping
// ApproxPercentile always uses custom agg executors,
// rather than an aggregation operator
| PbAggKind::ApproxPercentile
)
};
}
Expand Down Expand Up @@ -443,7 +446,10 @@ pub mod agg_kinds {
macro_rules! ordered_set {
() => {
AggKind::Builtin(
PbAggKind::PercentileCont | PbAggKind::PercentileDisc | PbAggKind::Mode,
PbAggKind::PercentileCont
| PbAggKind::PercentileDisc
| PbAggKind::Mode
| PbAggKind::ApproxPercentile,
)
};
}
Expand Down
67 changes: 67 additions & 0 deletions src/expr/impl/src/aggregate/approx_percentile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Range;

use risingwave_common::array::*;
use risingwave_common::types::*;
use risingwave_common_estimate_size::EstimateSize;
use risingwave_expr::aggregate::{AggCall, AggStateDyn, AggregateFunction, AggregateState};
use risingwave_expr::{build_aggregate, Result};

#[build_aggregate("approx_percentile(float8) -> float8")]
fn build(agg: &AggCall) -> Result<Box<dyn AggregateFunction>> {
let fraction = agg.direct_args[0]
.literal()
.map(|x| (*x.as_float64()).into());
Ok(Box::new(ApproxPercentile { fraction }))
}

#[allow(dead_code)]
pub struct ApproxPercentile {
fraction: Option<f64>,
}

#[derive(Debug, Default, EstimateSize)]
struct State(Vec<f64>);

impl AggStateDyn for State {}

#[async_trait::async_trait]
impl AggregateFunction for ApproxPercentile {
fn return_type(&self) -> DataType {
DataType::Float64
}

fn create_state(&self) -> Result<AggregateState> {
todo!()
}

async fn update(&self, _state: &mut AggregateState, _input: &StreamChunk) -> Result<()> {
todo!()
}

async fn update_range(
&self,
_state: &mut AggregateState,
_input: &StreamChunk,
_range: Range<usize>,
) -> Result<()> {
todo!()
}

async fn get_result(&self, _state: &AggregateState) -> Result<Datum> {
todo!()
}
}
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions src/expr/impl/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

mod approx_count_distinct;
mod approx_percentile;
mod array_agg;
mod bit_and;
mod bit_or;
Expand Down
56 changes: 56 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1000,3 +1000,59 @@
expected_outputs:
- batch_plan
- stream_plan
- name: test duplicate agg
sql: |
CREATE TABLE t (v1 int);
SELECT sum(v1) as x, count(v1) as y, sum(v1) as z, count(v1) as w from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile alone
sql: |
CREATE TABLE t (v1 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile with other simple aggs
sql: |
CREATE TABLE t (v1 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1), sum(v1) from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile with other simple aggs (sum, count)
sql: |
CREATE TABLE t (v1 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1), sum(v1) as s2, count(v1) from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile with duplicate approx_percentile
sql: |
CREATE TABLE t (v1 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as y from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile with different approx_percentile
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile with different approx_percentile interleaved with stateless simple aggs
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) as s2, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile with descending order
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.2, 0.01) WITHIN GROUP (order by v1 desc) from t;
expected_outputs:
- logical_plan
- stream_plan
Loading
Loading