From 4f0973275784e71d454ffb6933781c9608280653 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 19 Aug 2024 11:51:10 +0800 Subject: [PATCH 1/3] only support single phase approx_percentile batch agg --- .../tests/testdata/input/agg.yaml | 16 +++++ .../tests/testdata/output/agg.yaml | 66 +++++++++++++++++++ .../optimizer/plan_node/batch_simple_agg.rs | 11 +++- 3 files changed, 92 insertions(+), 1 deletion(-) diff --git a/src/frontend/planner_test/tests/testdata/input/agg.yaml b/src/frontend/planner_test/tests/testdata/input/agg.yaml index 1979e4ea1fb77..78fe815884e3c 100644 --- a/src/frontend/planner_test/tests/testdata/input/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/input/agg.yaml @@ -1013,6 +1013,7 @@ SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) from t; expected_outputs: - logical_plan + - batch_plan - stream_plan - name: test simple approx_percentile with other simple aggs sql: | @@ -1020,6 +1021,7 @@ SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1), sum(v1) from t; expected_outputs: - logical_plan + - batch_plan - stream_plan - name: test simple approx_percentile with other simple aggs (sum, count) sql: | @@ -1027,6 +1029,7 @@ SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1), sum(v1) as s2, count(v1) from t; expected_outputs: - logical_plan + - batch_plan - stream_plan - name: test simple approx_percentile with duplicate approx_percentile sql: | @@ -1041,6 +1044,7 @@ SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t; expected_outputs: - logical_plan + - batch_plan - stream_plan - name: test simple approx_percentile with different approx_percentile interleaved with stateless simple aggs sql: | @@ -1048,6 +1052,7 @@ SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) + approx_percentile(0.9, 0.01) WITHIN GROUP (order by v2) as y from t; expected_outputs: - logical_plan + - batch_plan - stream_plan - name: test simple approx_percentile with duplicated approx_percentile interleaved with stateless simple aggs sql: | @@ -1055,6 +1060,7 @@ SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), sum(v2) + approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t; expected_outputs: - logical_plan + - batch_plan - stream_plan - name: test simple approx_percentile with descending order sql: | @@ -1062,6 +1068,7 @@ SELECT sum(v1) as s1, approx_percentile(0.2, 0.01) WITHIN GROUP (order by v1 desc) from t; expected_outputs: - logical_plan + - batch_plan - stream_plan - name: test simple approx_percentile with different approx_percentile interleaved with stateless + stateful simple aggs sql: | @@ -1069,4 +1076,13 @@ SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), max(v2) as m2, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t; expected_outputs: - logical_plan + - batch_plan + - stream_plan +- name: test hash approx_percentile + sql: | + CREATE TABLE t (v1 int, v2 int); + SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) from t group by v2; + expected_outputs: + - logical_plan + - batch_plan - stream_plan \ No newline at end of file diff --git a/src/frontend/planner_test/tests/testdata/output/agg.yaml b/src/frontend/planner_test/tests/testdata/output/agg.yaml index eca739788bf6e..31fc752172d87 100644 --- a/src/frontend/planner_test/tests/testdata/output/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/output/agg.yaml @@ -1888,6 +1888,11 @@ └─LogicalAgg { aggs: [approx_percentile($expr1)] } └─LogicalProject { exprs: [t.v1::Float64 as $expr1] } └─LogicalScan { table: t, columns: [t.v1, t._row_id] } + batch_plan: |- + BatchSimpleAgg { aggs: [approx_percentile($expr1)] } + └─BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.v1::Float64 as $expr1] } + └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } @@ -1904,6 +1909,11 @@ └─LogicalAgg { aggs: [approx_percentile($expr1), sum(t.v1)] } └─LogicalProject { exprs: [t.v1::Float64 as $expr1, t.v1] } └─LogicalScan { table: t, columns: [t.v1, t._row_id] } + batch_plan: |- + BatchSimpleAgg { aggs: [approx_percentile($expr1), sum(t.v1)] } + └─BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.v1::Float64 as $expr1, t.v1] } + └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [approx_percentile, sum], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamRowMerge { output: [approx_percentile:Float64, sum(sum(t.v1)):Int64] } @@ -1928,6 +1938,12 @@ └─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count(t.v1)] } └─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1] } └─LogicalScan { table: t, columns: [t.v1, t._row_id] } + batch_plan: |- + BatchProject { exprs: [sum(t.v1), approx_percentile($expr1), sum(t.v1), count(t.v1)] } + └─BatchSimpleAgg { aggs: [sum(t.v1), approx_percentile($expr1), count(t.v1)] } + └─BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.v1, t.v1::Float64 as $expr1] } + └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, approx_percentile, s2, count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum(sum(t.v1)), sum0(count(t.v1))] } @@ -1970,6 +1986,11 @@ └─LogicalAgg { aggs: [approx_percentile($expr1), approx_percentile($expr2)] } └─LogicalProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2] } └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] } + batch_plan: |- + BatchSimpleAgg { aggs: [approx_percentile($expr1), approx_percentile($expr2)] } + └─BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2] } + └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [x, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] } @@ -1994,6 +2015,12 @@ └─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, sum(t.v2), approx_percentile($expr2)] } └─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] } └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] } + batch_plan: |- + BatchProject { exprs: [sum(t.v1), approx_percentile($expr1), count, (sum(t.v2)::Float64 + approx_percentile($expr2)) as $expr3] } + └─BatchSimpleAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, sum(t.v2), approx_percentile($expr2)] } + └─BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] } + └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, x, count, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3] } @@ -2026,6 +2053,12 @@ └─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, sum(t.v2), approx_percentile($expr2)] } └─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] } └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] } + batch_plan: |- + BatchProject { exprs: [sum(t.v1), approx_percentile($expr1), count, (sum(t.v2)::Float64 + approx_percentile($expr2)) as $expr3] } + └─BatchSimpleAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, sum(t.v2), approx_percentile($expr2)] } + └─BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] } + └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, x, count, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3] } @@ -2058,6 +2091,11 @@ └─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1)] } └─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1] } └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] } + batch_plan: |- + BatchSimpleAgg { aggs: [sum(t.v1), approx_percentile($expr1)] } + └─BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.v1, t.v1::Float64 as $expr1] } + └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64] } @@ -2082,6 +2120,11 @@ └─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, max(t.v2), approx_percentile($expr2)] } └─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] } └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] } + batch_plan: |- + BatchSimpleAgg { aggs: [sum(t.v1), approx_percentile($expr1), count, max(t.v2), approx_percentile($expr2)] } + └─BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2] } + └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, x, count, m2, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, max(max(t.v2)):Int32, approx_percentile:Float64] } @@ -2103,3 +2146,26 @@ └─StreamHashAgg { group_key: [$expr5], aggs: [sum(t.v1), count, max(t.v2)] } └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr3, t.v2, t.v2::Float64 as $expr4, t._row_id, Vnode(t._row_id) as $expr5] } └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } +- name: test hash approx_percentile + sql: | + CREATE TABLE t (v1 int, v2 int); + SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) from t group by v2; + logical_plan: |- + LogicalProject { exprs: [approx_percentile($expr1)] } + └─LogicalAgg { group_key: [t.v2], aggs: [approx_percentile($expr1)] } + └─LogicalProject { exprs: [t.v2, t.v1::Float64 as $expr1] } + └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] } + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [approx_percentile($expr1)] } + └─BatchHashAgg { group_key: [t.v2], aggs: [approx_percentile($expr1)] } + └─BatchExchange { order: [], dist: HashShard(t.v2) } + └─BatchProject { exprs: [t.v2, t.v1::Float64 as $expr1] } + └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [approx_percentile, t.v2(hidden)], stream_key: [t.v2], pk_columns: [t.v2], pk_conflict: NoCheck } + └─StreamProject { exprs: [approx_percentile($expr1), t.v2] } + └─StreamHashAgg { group_key: [t.v2], aggs: [approx_percentile($expr1), count] } + └─StreamExchange { dist: HashShard(t.v2) } + └─StreamProject { exprs: [t.v2, t.v1::Float64 as $expr1, t._row_id] } + └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs index ff20df7a4d177..894ad92011008 100644 --- a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_expr::aggregate::{AggKind, PbAggKind}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SortAggNode; @@ -51,7 +52,15 @@ impl BatchSimpleAgg { } pub(crate) fn can_two_phase_agg(&self) -> bool { - self.core.can_two_phase_agg() && self.two_phase_agg_enabled() + self.core.can_two_phase_agg() + && self + .core + // Ban two phase approx percentile. + .agg_calls + .iter() + .map(|agg_call| &agg_call.agg_kind) + .all(|agg_kind| !matches!(agg_kind, AggKind::Builtin(PbAggKind::ApproxPercentile))) + && self.two_phase_agg_enabled() } } From 42c35b5133967a28278505bc850d91d2f2da9c52 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 19 Aug 2024 12:02:15 +0800 Subject: [PATCH 2/3] add e2e tests --- ..._approx_percentile_merge_stateless_agg.slt | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateless_agg.slt b/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateless_agg.slt index 49d5d781f79f0..d273b6aacb60c 100644 --- a/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateless_agg.slt +++ b/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge_stateless_agg.slt @@ -61,6 +61,26 @@ select * from m1; ---- -963.1209598593477 0.5501000000000007 0.00009999833511933609 3965.1209598593477 +query I +select + approx_percentile(0.01, 0.01) within group (order by p_col) as p01, + round(sum(p_col)::numeric, 2) as s, + approx_percentile(0.5, 0.01) within group (order by p_col) as p50, + count(*)::double + approx_percentile(0.99, 0.01) within group (order by p_col) as p99 +from t; +---- +-963.1209598593477 0.55 0.00009999833511933609 3965.1209598593477 + +query I +select + approx_percentile(0.01, 0.01) within group (order by p_col) as p01, + round(sum(p_col)::numeric, 2) as s, + approx_percentile(0.5, 0.01) within group (order by p_col) as p50, + count(*)::double + approx_percentile(0.99, 0.01) within group (order by p_col) as p99 +from t group by grp_col; +---- +-963.1209598593477 0.55 0.00009999833511933609 3965.1209598593477 + query I select percentile_cont(0.01) within group (order by p_col) as p01, From 35c7e10157593cc7a6aaafe0fd62b755c302584d Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 19 Aug 2024 13:10:44 +0800 Subject: [PATCH 3/3] add delete tests --- .../aggregate/shuffle_approx_percentile.slt | 33 +++++++++++-------- ..._approx_percentile_merge_stateless_agg.slt | 19 +++++++++++ 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/e2e_test/streaming/aggregate/shuffle_approx_percentile.slt b/e2e_test/streaming/aggregate/shuffle_approx_percentile.slt index efc377f8aed48..3f2c400b91f0f 100644 --- a/e2e_test/streaming/aggregate/shuffle_approx_percentile.slt +++ b/e2e_test/streaming/aggregate/shuffle_approx_percentile.slt @@ -58,19 +58,6 @@ select * from m1; ---- -982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152 -# Test state encode / decode -onlyif can-use-recover -statement ok -recover; - -onlyif can-use-recover -sleep 10s - -query I -select * from m1; ----- --982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152 - # Test 0