Skip to content

Commit

Permalink
feat(frontend): support single phase approx percentile in batch (#18083
Browse files Browse the repository at this point in the history
…) (#18156)
  • Loading branch information
kwannoel authored Aug 21, 2024
1 parent 6c28e6e commit 963572a
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 14 deletions.
33 changes: 20 additions & 13 deletions e2e_test/streaming/aggregate/shuffle_approx_percentile.slt
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,6 @@ select * from m1;
----
-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152

# Test state encode / decode
onlyif can-use-recover
statement ok
recover;

onlyif can-use-recover
sleep 10s

query I
select * from m1;
----
-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152

# Test 0<x<1 values
statement ok
insert into t select 0.001, 1 from generate_series(1, 500);
Expand All @@ -97,6 +84,26 @@ from t group by grp_col;
----
-970 -700 0.0001 700 970

statement ok
delete from t;

statement ok
insert into t select 0.0001, 1 from generate_series(1, 501);

statement ok
flush;

query I
select
round(p01::numeric, 6) as p01,
round(p10::numeric, 6) as p10,
round(p50::numeric, 6) as p50,
round(p90::numeric, 6) as p90,
round(p99::numeric, 6) as p99
from m1;
----
0.000100 0.000100 0.000100 0.000100 0.000100

statement ok
drop materialized view m1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,26 @@ select * from m1;
----
-963.1209598593477 0.5501000000000007 0.00009999833511933609 3965.1209598593477

query I
select
approx_percentile(0.01, 0.01) within group (order by p_col) as p01,
round(sum(p_col)::numeric, 2) as s,
approx_percentile(0.5, 0.01) within group (order by p_col) as p50,
count(*)::double + approx_percentile(0.99, 0.01) within group (order by p_col) as p99
from t;
----
-963.1209598593477 0.55 0.00009999833511933609 3965.1209598593477

query I
select
approx_percentile(0.01, 0.01) within group (order by p_col) as p01,
round(sum(p_col)::numeric, 2) as s,
approx_percentile(0.5, 0.01) within group (order by p_col) as p50,
count(*)::double + approx_percentile(0.99, 0.01) within group (order by p_col) as p99
from t group by grp_col;
----
-963.1209598593477 0.55 0.00009999833511933609 3965.1209598593477

query I
select
percentile_cont(0.01) within group (order by p_col) as p01,
Expand All @@ -72,6 +92,25 @@ from t;
----
-969.99 55 0.0001 3002 969.9899999999998

statement ok
delete from t;

statement ok
insert into t select 0.0001, 1 from generate_series(1, 501);

statement ok
flush;

query I
select
round(p01::numeric, 6),
round(s::numeric, 6),
round(p50::numeric, 6),
round(p99::numeric, 6) as p99_plus_sum
from m1;
----
0.000100 0.050100 0.000100 501.000100

statement ok
drop materialized view m1;

Expand Down
17 changes: 17 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1013,20 +1013,23 @@
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) from t;
expected_outputs:
- logical_plan
- batch_plan
- stream_plan
- name: test simple approx_percentile with other simple aggs
sql: |
CREATE TABLE t (v1 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1), sum(v1) from t;
expected_outputs:
- logical_plan
- batch_plan
- stream_plan
- name: test simple approx_percentile with other simple aggs (sum, count)
sql: |
CREATE TABLE t (v1 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1), sum(v1) as s2, count(v1) from t;
expected_outputs:
- logical_plan
- batch_plan
- stream_plan
- name: test simple approx_percentile with duplicate approx_percentile
sql: |
Expand All @@ -1041,34 +1044,47 @@
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
expected_outputs:
- logical_plan
- batch_plan
- stream_plan
- name: test simple approx_percentile with different approx_percentile interleaved with stateless simple aggs
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) + approx_percentile(0.9, 0.01) WITHIN GROUP (order by v2) as y from t;
expected_outputs:
- logical_plan
- batch_plan
- stream_plan
- name: test simple approx_percentile with duplicated approx_percentile interleaved with stateless simple aggs
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) + approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
expected_outputs:
- logical_plan
- batch_plan
- stream_plan
- name: test simple approx_percentile with descending order
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.2, 0.01) WITHIN GROUP (order by v1 desc) from t;
expected_outputs:
- logical_plan
- batch_plan
- stream_plan
- name: test simple approx_percentile with different approx_percentile interleaved with stateless + stateful simple aggs
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), max(v2) as m2, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
expected_outputs:
- logical_plan
- batch_plan
- stream_plan
- name: test hash approx_percentile
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) from t group by v2;
expected_outputs:
- logical_plan
- batch_plan
- stream_plan
- name: test approx_percentile hash_agg forced should use single phase agg
sql: |
Expand All @@ -1083,4 +1099,5 @@
SELECT approx_percentile(0.5) WITHIN GROUP (order by v1) from t;
expected_outputs:
- logical_plan
- batch_plan
- stream_plan
71 changes: 71 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1888,6 +1888,11 @@
└─LogicalAgg { aggs: [approx_percentile($expr1)] }
└─LogicalProject { exprs: [t.v1::Float64 as $expr1] }
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
batch_plan: |-
BatchSimpleAgg { aggs: [approx_percentile($expr1)] }
└─BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.v1::Float64 as $expr1] }
└─BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
Expand All @@ -1904,6 +1909,11 @@
└─LogicalAgg { aggs: [approx_percentile($expr1), sum(t.v1)] }
└─LogicalProject { exprs: [t.v1::Float64 as $expr1, t.v1] }
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
batch_plan: |-
BatchSimpleAgg { aggs: [approx_percentile($expr1), sum(t.v1)] }
└─BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.v1::Float64 as $expr1, t.v1] }
└─BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [approx_percentile, sum], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamRowMerge { output: [approx_percentile:Float64, sum(sum(t.v1)):Int64] }
Expand All @@ -1928,6 +1938,12 @@
└─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count(t.v1)] }
└─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1] }
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
batch_plan: |-
BatchProject { exprs: [sum(t.v1), approx_percentile($expr1), sum(t.v1), count(t.v1)] }
└─BatchSimpleAgg { aggs: [sum(t.v1), approx_percentile($expr1), count(t.v1)] }
└─BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.v1, t.v1::Float64 as $expr1] }
└─BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [s1, approx_percentile, s2, count], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum(sum(t.v1)), sum0(count(t.v1))] }
Expand Down Expand Up @@ -1970,6 +1986,11 @@
└─LogicalAgg { aggs: [approx_percentile($expr1), approx_percentile($expr2)] }
└─LogicalProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
batch_plan: |-
BatchSimpleAgg { aggs: [approx_percentile($expr1), approx_percentile($expr2)] }
└─BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2] }
└─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] }
Expand All @@ -1994,6 +2015,12 @@
└─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, sum(t.v2), approx_percentile($expr2)] }
└─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
batch_plan: |-
BatchProject { exprs: [sum(t.v1), approx_percentile($expr1), count, (sum(t.v2)::Float64 + approx_percentile($expr2)) as $expr3] }
└─BatchSimpleAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, sum(t.v2), approx_percentile($expr2)] }
└─BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] }
└─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [s1, x, count, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3] }
Expand Down Expand Up @@ -2026,6 +2053,12 @@
└─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, sum(t.v2), approx_percentile($expr2)] }
└─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
batch_plan: |-
BatchProject { exprs: [sum(t.v1), approx_percentile($expr1), count, (sum(t.v2)::Float64 + approx_percentile($expr2)) as $expr3] }
└─BatchSimpleAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, sum(t.v2), approx_percentile($expr2)] }
└─BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] }
└─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [s1, x, count, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3] }
Expand Down Expand Up @@ -2058,6 +2091,11 @@
└─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1)] }
└─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
batch_plan: |-
BatchSimpleAgg { aggs: [sum(t.v1), approx_percentile($expr1)] }
└─BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.v1, t.v1::Float64 as $expr1] }
└─BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [s1, approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64] }
Expand All @@ -2082,6 +2120,11 @@
└─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, max(t.v2), approx_percentile($expr2)] }
└─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
batch_plan: |-
BatchSimpleAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, max(t.v2), approx_percentile($expr2)] }
└─BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] }
└─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [s1, x, count, m2, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, max(max(t.v2)):Int32, approx_percentile:Float64] }
Expand All @@ -2103,6 +2146,29 @@
└─StreamHashAgg { group_key: [$expr5], aggs: [sum(t.v1), count, max(t.v2)] }
└─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr3, t.v2, t.v2::Float64 as $expr4, t._row_id, Vnode(t._row_id) as $expr5] }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: test hash approx_percentile
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) from t group by v2;
logical_plan: |-
LogicalProject { exprs: [approx_percentile($expr1)] }
└─LogicalAgg { group_key: [t.v2], aggs: [approx_percentile($expr1)] }
└─LogicalProject { exprs: [t.v2, t.v1::Float64 as $expr1] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [approx_percentile($expr1)] }
└─BatchHashAgg { group_key: [t.v2], aggs: [approx_percentile($expr1)] }
└─BatchExchange { order: [], dist: HashShard(t.v2) }
└─BatchProject { exprs: [t.v2, t.v1::Float64 as $expr1] }
└─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [approx_percentile, t.v2(hidden)], stream_key: [t.v2], pk_columns: [t.v2], pk_conflict: NoCheck }
└─StreamProject { exprs: [approx_percentile($expr1), t.v2] }
└─StreamHashAgg { group_key: [t.v2], aggs: [approx_percentile($expr1), count] }
└─StreamExchange { dist: HashShard(t.v2) }
└─StreamProject { exprs: [t.v2, t.v1::Float64 as $expr1, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: test approx_percentile hash_agg forced should use single phase agg
sql: |
SET RW_FORCE_TWO_PHASE_AGG=true;
Expand All @@ -2120,6 +2186,11 @@
└─LogicalAgg { aggs: [approx_percentile($expr1)] }
└─LogicalProject { exprs: [t.v1::Float64 as $expr1] }
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
batch_plan: |-
BatchSimpleAgg { aggs: [approx_percentile($expr1)] }
└─BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.v1::Float64 as $expr1] }
└─BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
Expand Down
11 changes: 10 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_expr::aggregate::{AggKind, PbAggKind};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SortAggNode;

Expand Down Expand Up @@ -51,7 +52,15 @@ impl BatchSimpleAgg {
}

pub(crate) fn can_two_phase_agg(&self) -> bool {
self.core.can_two_phase_agg() && self.two_phase_agg_enabled()
self.core.can_two_phase_agg()
&& self
.core
// Ban two phase approx percentile.
.agg_calls
.iter()
.map(|agg_call| &agg_call.agg_kind)
.all(|agg_kind| !matches!(agg_kind, AggKind::Builtin(PbAggKind::ApproxPercentile)))
&& self.two_phase_agg_enabled()
}
}

Expand Down

0 comments on commit 963572a

Please sign in to comment.