From 0c6e50cd63bcc345f064d0d9a912c0abaa95c814 Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Wed, 4 Sep 2024 00:31:12 +0800 Subject: [PATCH] feat(streaming): add `must_output_per_barrier` flag for stream simple agg (#18374) Co-authored-by: Eric Fu --- proto/stream_plan.proto | 3 + .../tests/testdata/output/agg.yaml | 200 +++++++++--------- .../tests/testdata/output/append_only.yaml | 2 +- .../tests/testdata/output/bushy_join.yaml | 2 +- .../tests/testdata/output/ch_benchmark.yaml | 35 +-- .../tests/testdata/output/cse_expr.yaml | 2 +- .../tests/testdata/output/dynamic_filter.yaml | 16 +- .../testdata/output/lateral_subquery.yaml | 2 +- .../tests/testdata/output/limit.yaml | 4 +- .../tests/testdata/output/mv_column_name.yaml | 2 +- .../tests/testdata/output/nexmark.yaml | 8 +- .../tests/testdata/output/nexmark_source.yaml | 8 +- .../output/nexmark_temporal_filter.yaml | 8 +- .../testdata/output/nexmark_watermark.yaml | 8 +- .../tests/testdata/output/share.yaml | 16 +- .../testdata/output/stream_dist_agg.yaml | 72 +++---- .../tests/testdata/output/temporal_join.yaml | 2 +- .../tests/testdata/output/tpch.yaml | 32 +-- .../src/optimizer/plan_node/logical_agg.rs | 72 ++++--- .../optimizer/plan_node/stream_simple_agg.rs | 22 +- .../plan_node/stream_stateless_simple_agg.rs | 1 + src/stream/src/executor/agg_common.rs | 4 +- src/stream/src/executor/integration_tests.rs | 1 + src/stream/src/executor/simple_agg.rs | 95 ++++++++- src/stream/src/executor/test_utils.rs | 5 +- src/stream/src/from_proto/simple_agg.rs | 5 +- 26 files changed, 380 insertions(+), 247 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 4148e78690745..5ea2f018eee20 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -365,6 +365,9 @@ message SimpleAggNode { map distinct_dedup_tables = 6; uint32 row_count_index = 7; AggNodeVersion version = 8; + // Required by the downstream `RowMergeNode`, + // currently only used by the `approx_percentile`'s two phase plan + bool must_output_per_barrier = 9; } message HashAggNode { diff --git a/src/frontend/planner_test/tests/testdata/output/agg.yaml b/src/frontend/planner_test/tests/testdata/output/agg.yaml index a2033bcb09c6b..a69645406140b 100644 --- a/src/frontend/planner_test/tests/testdata/output/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/output/agg.yaml @@ -71,7 +71,7 @@ └─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [agg], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [(min(min(t.v1)) + (max(max(t.v2)) * sum0(count(t.v3)))) as $expr1], noop_update_hint: true } + └─StreamProject { exprs: [(min(min(t.v1)) + (max(max(t.v2)) * sum0(count(t.v3)))) as $expr1] } └─StreamSimpleAgg { aggs: [min(min(t.v1)), max(max(t.v2)), sum0(count(t.v3)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [min(t.v1), max(t.v2), count(t.v3), count] } @@ -273,7 +273,7 @@ └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [cnt, sum], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count($expr1)), sum(sum($expr1))], noop_update_hint: true } + └─StreamProject { exprs: [sum0(count($expr1)), sum(sum($expr1))] } └─StreamSimpleAgg { aggs: [sum0(count($expr1)), sum(sum($expr1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count($expr1), sum($expr1)] } @@ -571,7 +571,7 @@ └─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [agg], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [(min(min(t.v1)) + (max(max(t.v3)) * sum0(count(t.v2)))) as $expr1], noop_update_hint: true } + └─StreamProject { exprs: [(min(min(t.v1)) + (max(max(t.v3)) * sum0(count(t.v2)))) as $expr1] } └─StreamSimpleAgg { aggs: [min(min(t.v1)), max(max(t.v3)), sum0(count(t.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [min(t.v1), max(t.v3), count(t.v2), count] } @@ -628,7 +628,7 @@ └─LogicalScan { table: t, columns: [t.v1] } stream_plan: |- StreamMaterialize { columns: [s1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1))], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum(t.v1))] } └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] } @@ -647,7 +647,7 @@ └─LogicalScan { table: t, columns: [t.v1] } stream_plan: |- StreamMaterialize { columns: [s1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1))], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum(t.v1))] } └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] } @@ -666,7 +666,7 @@ └─LogicalScan { table: t, columns: [t.v1] } stream_plan: |- StreamMaterialize { columns: [s1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1))], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum(t.v1))] } └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] } @@ -685,7 +685,7 @@ └─LogicalScan { table: t, columns: [t.v1] } stream_plan: |- StreamMaterialize { columns: [sa], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1) filter((t.v1 > 0:Int32)))], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum(t.v1) filter((t.v1 > 0:Int32)))] } └─StreamSimpleAgg { aggs: [sum(sum(t.v1) filter((t.v1 > 0:Int32))), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v1) filter((t.v1 > 0:Int32))] } @@ -720,7 +720,7 @@ └─LogicalScan { table: t, columns: [t.a, t.b] } stream_plan: |- StreamMaterialize { columns: [sab], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(max($expr1) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32))))], noop_update_hint: true } + └─StreamProject { exprs: [max(max($expr1) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32))))] } └─StreamSimpleAgg { aggs: [max(max($expr1) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32)))), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [$expr2], aggs: [max($expr1) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32))), count] } @@ -759,7 +759,7 @@ └─LogicalScan { table: t, columns: [t.a, t.b] } stream_plan: |- StreamMaterialize { columns: [cnt_agb], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count filter((t.a > t.b)))], noop_update_hint: true } + └─StreamProject { exprs: [sum0(count filter((t.a > t.b)))] } └─StreamSimpleAgg { aggs: [sum0(count filter((t.a > t.b))), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count filter((t.a > t.b))] } @@ -813,7 +813,7 @@ └─BatchScan { table: t, columns: [t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [b], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v2) filter((t.v2 < 5:Int32)))], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum(t.v2) filter((t.v2 < 5:Int32)))] } └─StreamSimpleAgg { aggs: [sum(sum(t.v2) filter((t.v2 < 5:Int32))), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v2) filter((t.v2 < 5:Int32))] } @@ -896,7 +896,7 @@ └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [string_agg, count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [string_agg(t.y, ',':Varchar), count(distinct t.x)], noop_update_hint: true } + └─StreamProject { exprs: [string_agg(t.y, ',':Varchar), count(distinct t.x)] } └─StreamSimpleAgg { aggs: [string_agg(t.y, ',':Varchar), count(distinct t.x), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [t.y, ',':Varchar, t.x, t._row_id] } @@ -917,7 +917,7 @@ └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [string_agg, count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [string_agg(t.y, ',':Varchar order_by(t.y ASC)), count(distinct t.x)], noop_update_hint: true } + └─StreamProject { exprs: [string_agg(t.y, ',':Varchar order_by(t.y ASC)), count(distinct t.x)] } └─StreamSimpleAgg { aggs: [string_agg(t.y, ',':Varchar order_by(t.y ASC)), count(distinct t.x), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [t.y, ',':Varchar, t.x, t._row_id] } @@ -938,7 +938,7 @@ └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [string_agg, count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [string_agg(distinct t.y, ',':Varchar order_by(t.y ASC)), count(distinct t.x)], noop_update_hint: true } + └─StreamProject { exprs: [string_agg(distinct t.y, ',':Varchar order_by(t.y ASC)), count(distinct t.x)] } └─StreamSimpleAgg { aggs: [string_agg(distinct t.y, ',':Varchar order_by(t.y ASC)), count(distinct t.x), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [t.y, ',':Varchar, t.x, t._row_id] } @@ -1006,7 +1006,7 @@ └─LogicalScan { table: t, columns: [t.a, t.b] } stream_plan: |- StreamMaterialize { columns: [s1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum($expr1) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32)))], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum($expr1) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32)))] } └─StreamSimpleAgg { aggs: [sum(sum($expr1) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32))), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32))] } @@ -1313,7 +1313,7 @@ stream_plan: |- StreamMaterialize { columns: [stddev_samp, stddev_pop], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamProject { exprs: [Case((sum0(count(t.v1)) <= 1:Int64), null:Decimal, Sqrt(((sum(sum($expr1))::Decimal - (($expr2 * $expr2) / $expr3)) / (sum0(count(t.v1)) - 1:Int64)::Decimal))) as $expr4, Sqrt(((sum(sum($expr1))::Decimal - (($expr2 * $expr2) / $expr3)) / $expr3)) as $expr5] } - └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1)), sum(sum(t.v1))::Decimal as $expr2, sum0(count(t.v1))::Decimal as $expr3], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1)), sum(sum(t.v1))::Decimal as $expr2, sum0(count(t.v1))::Decimal as $expr3] } └─StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum(t.v1), count(t.v1)] } @@ -1370,7 +1370,7 @@ └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [min, sum], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [min(min(t.v1)), sum(sum(t.v2))], noop_update_hint: true } + └─StreamProject { exprs: [min(min(t.v1)), sum(sum(t.v2))] } └─StreamSimpleAgg { aggs: [min(min(t.v1)), sum(sum(t.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [min(t.v1), sum(t.v2), count] } @@ -1388,7 +1388,7 @@ └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [min, sum], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [min(t.v1), sum(t.v2)], noop_update_hint: true } + └─StreamProject { exprs: [min(t.v1), sum(t.v2)] } └─StreamSimpleAgg { aggs: [min(t.v1), sum(t.v2), count] } └─StreamExchange { dist: Single } └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -1677,7 +1677,7 @@ └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [first_value(t.x order_by(t.y ASC))], noop_update_hint: true } + └─StreamProject { exprs: [first_value(t.x order_by(t.y ASC))] } └─StreamSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC)), count] } └─StreamExchange { dist: Single } └─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -1685,7 +1685,7 @@ Fragment 0 StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [first_value(t.x order_by(t.y ASC))], noop_update_hint: true } + └── StreamProject { exprs: [first_value(t.x order_by(t.y ASC))] } └── StreamSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 @@ -1717,7 +1717,7 @@ Fragment 0 StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [first_value(distinct t.x order_by(t.x ASC))], noop_update_hint: true } + └── StreamProject { exprs: [first_value(distinct t.x order_by(t.x ASC))] } └── StreamSimpleAgg { aggs: [first_value(distinct t.x order_by(t.x ASC)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0, SimpleAggDedupForCol0: 2 ] └── StreamExchange Single from 1 @@ -1753,7 +1753,7 @@ └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [last_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [last_value(t.x order_by(t.y DESC NULLS LAST))], noop_update_hint: true } + └─StreamProject { exprs: [last_value(t.x order_by(t.y DESC NULLS LAST))] } └─StreamSimpleAgg { aggs: [last_value(t.x order_by(t.y DESC NULLS LAST)), count] } └─StreamExchange { dist: Single } └─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -1874,7 +1874,7 @@ └─LogicalScan { table: t, columns: [t.v1, t._row_id] } stream_plan: |- StreamMaterialize { columns: [x, y, z, w], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1)), sum0(count(t.v1)), sum(sum(t.v1)), sum0(count(t.v1))], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum(t.v1)), sum0(count(t.v1)), sum(sum(t.v1)), sum0(count(t.v1))] } └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count(t.v1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v1), count(t.v1)] } @@ -1895,12 +1895,11 @@ └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [approx_percentile], noop_update_hint: true } - └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } - └─StreamExchange { dist: Single } - └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } - └─StreamProject { exprs: [t.v1::Float64 as $expr1, t._row_id] } - └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } + └─StreamExchange { dist: Single } + └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } + └─StreamProject { exprs: [t.v1::Float64 as $expr1, t._row_id] } + └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: test simple approx_percentile with other simple aggs sql: | CREATE TABLE t (v1 int); @@ -1917,20 +1916,19 @@ └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [approx_percentile, sum], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [approx_percentile, sum(sum(t.v1))], noop_update_hint: true } - └─StreamRowMerge { output: [approx_percentile:Float64, sum(sum(t.v1)):Int64] } - ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ └─StreamExchange { dist: Single } - │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ └─StreamShare { id: 2 } - │ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] } - │ └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count] } - └─StreamExchange { dist: Single } - └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] } - └─StreamShare { id: 2 } - └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] } - └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamRowMerge { output: [approx_percentile:Float64, sum(sum(t.v1)):Int64] } + ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ └─StreamExchange { dist: Single } + │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ └─StreamShare { id: 2 } + │ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] } + │ └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count], must_output_per_barrier: true } + └─StreamExchange { dist: Single } + └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] } + └─StreamShare { id: 2 } + └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] } + └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: test simple approx_percentile with other simple aggs (sum, count) sql: | CREATE TABLE t (v1 int); @@ -1948,7 +1946,7 @@ └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, approx_percentile, s2, count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum(sum(t.v1)), sum0(count(t.v1))], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum(sum(t.v1)), sum0(count(t.v1))] } └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count(t.v1)):Int64] } ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } │ └─StreamExchange { dist: Single } @@ -1956,7 +1954,7 @@ │ └─StreamShare { id: 2 } │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t._row_id] } │ └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count(t.v1)), count] } + └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count(t.v1)), count], must_output_per_barrier: true } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v1), count(t.v1)] } └─StreamShare { id: 2 } @@ -1973,7 +1971,7 @@ └─LogicalScan { table: t, columns: [t.v1, t._row_id] } stream_plan: |- StreamMaterialize { columns: [x, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [approx_percentile, approx_percentile], noop_update_hint: true } + └─StreamProject { exprs: [approx_percentile, approx_percentile] } └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } └─StreamExchange { dist: Single } └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } @@ -1995,20 +1993,19 @@ └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [x, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [approx_percentile, approx_percentile], noop_update_hint: true } - └─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] } - ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ └─StreamExchange { dist: Single } - │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ └─StreamShare { id: 2 } - │ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] } - │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } - └─StreamExchange { dist: Single } - └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 } - └─StreamShare { id: 2 } - └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] } - └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] } + ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ └─StreamExchange { dist: Single } + │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ └─StreamShare { id: 2 } + │ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] } + │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } + └─StreamExchange { dist: Single } + └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 } + └─StreamShare { id: 2 } + └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] } + └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: test simple approx_percentile with different approx_percentile interleaved with stateless simple aggs sql: | CREATE TABLE t (v1 int, v2 int); @@ -2026,7 +2023,7 @@ └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, x, count, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3] } └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, sum(sum(t.v2)):Int64, approx_percentile:Float64] } ├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] } │ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } @@ -2041,7 +2038,7 @@ │ └─StreamShare { id: 2 } │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] } │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), sum(sum(t.v2)), count] } + └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), sum(sum(t.v2)), count], must_output_per_barrier: true } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v1), count, sum(t.v2)] } └─StreamShare { id: 2 } @@ -2064,7 +2061,7 @@ └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, x, count, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3] } └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, sum(sum(t.v2)):Int64, approx_percentile:Float64] } ├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] } │ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } @@ -2079,7 +2076,7 @@ │ └─StreamShare { id: 2 } │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] } │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), sum(sum(t.v2)), count] } + └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), sum(sum(t.v2)), count], must_output_per_barrier: true } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v1), count, sum(t.v2)] } └─StreamShare { id: 2 } @@ -2101,20 +2098,19 @@ └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile], noop_update_hint: true } - └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64] } - ├─StreamGlobalApproxPercentile { quantile: 0.8:Float64, relative_error: 0.01:Float64 } - │ └─StreamExchange { dist: Single } - │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.8:Float64, relative_error: 0.01:Float64 } - │ └─StreamShare { id: 2 } - │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t._row_id] } - │ └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count] } - └─StreamExchange { dist: Single } - └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] } - └─StreamShare { id: 2 } - └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t._row_id] } - └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64] } + ├─StreamGlobalApproxPercentile { quantile: 0.8:Float64, relative_error: 0.01:Float64 } + │ └─StreamExchange { dist: Single } + │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.8:Float64, relative_error: 0.01:Float64 } + │ └─StreamShare { id: 2 } + │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t._row_id] } + │ └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count], must_output_per_barrier: true } + └─StreamExchange { dist: Single } + └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] } + └─StreamShare { id: 2 } + └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t._row_id] } + └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: test simple approx_percentile with different approx_percentile interleaved with stateless + stateful simple aggs sql: | CREATE TABLE t (v1 int, v2 int); @@ -2131,26 +2127,25 @@ └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, x, count, m2, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), max(max(t.v2)), approx_percentile], noop_update_hint: true } - └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, max(max(t.v2)):Int32, approx_percentile:Float64] } - ├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] } - │ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ │ └─StreamExchange { dist: Single } - │ │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ │ └─StreamShare { id: 2 } - │ │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] } - │ │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - │ └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ └─StreamExchange { dist: Single } - │ └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ └─StreamShare { id: 2 } - │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] } - │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), max(max(t.v2)), count] } - └─StreamExchange { dist: Single } - └─StreamHashAgg { group_key: [$expr5], aggs: [sum(t.v1), count, max(t.v2)] } - └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr3, t.v2, t.v2::Float64 as $expr4, t._row_id, Vnode(t._row_id) as $expr5] } - └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, max(max(t.v2)):Int32, approx_percentile:Float64] } + ├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] } + │ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ │ └─StreamExchange { dist: Single } + │ │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ │ └─StreamShare { id: 2 } + │ │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] } + │ │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + │ └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ └─StreamExchange { dist: Single } + │ └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ └─StreamShare { id: 2 } + │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] } + │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), max(max(t.v2)), count], must_output_per_barrier: true } + └─StreamExchange { dist: Single } + └─StreamHashAgg { group_key: [$expr5], aggs: [sum(t.v1), count, max(t.v2)] } + └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr3, t.v2, t.v2::Float64 as $expr4, t._row_id, Vnode(t._row_id) as $expr5] } + └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: test hash approx_percentile sql: | CREATE TABLE t (v1 int, v2 int); @@ -2198,9 +2193,8 @@ └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [approx_percentile], noop_update_hint: true } - └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } - └─StreamExchange { dist: Single } - └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } - └─StreamProject { exprs: [t.v1::Float64 as $expr1, t._row_id] } - └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } + └─StreamExchange { dist: Single } + └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } + └─StreamProject { exprs: [t.v1::Float64 as $expr1, t._row_id] } + └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/output/append_only.yaml b/src/frontend/planner_test/tests/testdata/output/append_only.yaml index e76813e05f759..d0701675c3617 100644 --- a/src/frontend/planner_test/tests/testdata/output/append_only.yaml +++ b/src/frontend/planner_test/tests/testdata/output/append_only.yaml @@ -33,7 +33,7 @@ select max(v1) as max_v1 from t1; stream_plan: |- StreamMaterialize { columns: [max_v1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(max(t1.v1))], noop_update_hint: true } + └─StreamProject { exprs: [max(max(t1.v1))] } └─StreamSimpleAgg [append_only] { aggs: [max(max(t1.v1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [max(t1.v1)] } diff --git a/src/frontend/planner_test/tests/testdata/output/bushy_join.yaml b/src/frontend/planner_test/tests/testdata/output/bushy_join.yaml index a785ac443901a..9d042f1e60c8b 100644 --- a/src/frontend/planner_test/tests/testdata/output/bushy_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/bushy_join.yaml @@ -8,7 +8,7 @@ sql: select count(*) from t t1 join t t2 on t1.id = t2.id join t t3 on t1.id = t3.id join t t4 on t1.id = t4.id join t t5 on t1.id = t5.id join t t6 on t1.id = t6.id join t t7 on t1.id = t7.id join t t8 on t1.id = t8.id; stream_plan: |- StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } + └─StreamProject { exprs: [sum0(count)] } └─StreamSimpleAgg { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } diff --git a/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml b/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml index 3b96806aabc7a..ce98b8bea75c9 100644 --- a/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml @@ -869,7 +869,7 @@ └─BatchScan { table: order_line, columns: [order_line.ol_amount, order_line.ol_delivery_d, order_line.ol_quantity], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(order_line.ol_amount))], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum(order_line.ol_amount))] } └─StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(order_line.ol_amount)] } @@ -880,7 +880,7 @@ Fragment 0 StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum(sum(order_line.ol_amount))], noop_update_hint: true } + └── StreamProject { exprs: [sum(sum(order_line.ol_amount))] } └── StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -1940,7 +1940,7 @@ │ └─StreamProject { exprs: [stock.s_i_id, stock.s_order_cnt, ((stock.s_w_id * stock.s_i_id) % 10000:Int32)::Int64 as $expr1, stock.s_w_id] } │ └─StreamTableScan { table: stock, columns: [stock.s_i_id, stock.s_w_id, stock.s_order_cnt], stream_scan_type: ArrangementBackfill, stream_key: [stock.s_w_id, stock.s_i_id], pk: [s_w_id, s_i_id], dist: UpstreamHashShard(stock.s_w_id, stock.s_i_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum(sum(stock.s_order_cnt))::Decimal * 0.005:Decimal) as $expr3], noop_update_hint: true } + └─StreamProject { exprs: [(sum(sum(stock.s_order_cnt))::Decimal * 0.005:Decimal) as $expr3] } └─StreamSimpleAgg { aggs: [sum(sum(stock.s_order_cnt)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(stock.s_order_cnt)] } @@ -2008,7 +2008,7 @@ └── BatchPlanNode Fragment 7 - StreamProject { exprs: [(sum(sum(stock.s_order_cnt))::Decimal * 0.005:Decimal) as $expr3], noop_update_hint: true } + StreamProject { exprs: [(sum(sum(stock.s_order_cnt))::Decimal * 0.005:Decimal) as $expr3] } └── StreamSimpleAgg { aggs: [sum(sum(stock.s_order_cnt)), count] } { tables: [ SimpleAggState: 14 ] } └── StreamExchange Single from 8 @@ -2265,7 +2265,7 @@ └─BatchScan { table: order_line, columns: [order_line.ol_i_id, order_line.ol_amount, order_line.ol_delivery_d], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / (1:Decimal + sum(sum(order_line.ol_amount)))) as $expr2], noop_update_hint: true } + └─StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / (1:Decimal + sum(sum(order_line.ol_amount)))) as $expr2] } └─StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(order_line.ol_amount)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum(order_line.ol_amount)] } @@ -2279,9 +2279,11 @@ └─StreamTableScan { table: item, columns: [item.i_id, item.i_data], stream_scan_type: ArrangementBackfill, stream_key: [item.i_id], pk: [i_id], dist: UpstreamHashShard(item.i_id) } stream_dist_plan: |+ Fragment 0 - StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } - └── StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / (1:Decimal + sum(sum(order_line.ol_amount)))) as $expr2], noop_update_hint: true } - └── StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(order_line.ol_amount)), count] } { tables: [ SimpleAggState: 0 ] } + StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } + ├── tables: [ Materialize: 4294967294 ] + └── StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / (1:Decimal + sum(sum(order_line.ol_amount)))) as $expr2] } + └── StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(order_line.ol_amount)), count] } + ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 Fragment 1 @@ -2356,7 +2358,7 @@ │ └─StreamProject { exprs: [revenue1.total_revenue, revenue1.supplier_no::Int64 as $expr1, revenue1.supplier_no] } │ └─StreamTableScan { table: revenue1, columns: [revenue1.supplier_no, revenue1.total_revenue], stream_scan_type: ArrangementBackfill, stream_key: [revenue1.supplier_no], pk: [supplier_no], dist: UpstreamHashShard(revenue1.supplier_no) } └─StreamExchange { dist: HashShard(max(max(revenue1.total_revenue))) } - └─StreamProject { exprs: [max(max(revenue1.total_revenue))], noop_update_hint: true } + └─StreamProject { exprs: [max(max(revenue1.total_revenue))] } └─StreamSimpleAgg { aggs: [max(max(revenue1.total_revenue)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(revenue1.total_revenue), count] } @@ -2394,7 +2396,7 @@ └── BatchPlanNode Fragment 5 - StreamProject { exprs: [max(max(revenue1.total_revenue))], noop_update_hint: true } + StreamProject { exprs: [max(max(revenue1.total_revenue))] } └── StreamSimpleAgg { aggs: [max(max(revenue1.total_revenue)), count] } { tables: [ SimpleAggState: 11, SimpleAggCall0: 10 ] } └── StreamExchange Single from 6 @@ -2626,7 +2628,7 @@ └─BatchScan { table: order_line, columns: [order_line.ol_i_id, order_line.ol_quantity], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [avg_yearly], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [(sum(sum(order_line.ol_amount)) / 2.0:Decimal) as $expr3], noop_update_hint: true } + └─StreamProject { exprs: [(sum(sum(order_line.ol_amount)) / 2.0:Decimal) as $expr3] } └─StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(order_line.ol_amount)] } @@ -2649,8 +2651,9 @@ Fragment 0 StreamMaterialize { columns: [avg_yearly], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [(sum(sum(order_line.ol_amount)) / 2.0:Decimal) as $expr3], noop_update_hint: true } - └── StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } { tables: [ SimpleAggState: 0 ] } + └── StreamProject { exprs: [(sum(sum(order_line.ol_amount)) / 2.0:Decimal) as $expr3] } + └── StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } + ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 Fragment 1 @@ -2858,7 +2861,7 @@ └─BatchScan { table: order_line, columns: [order_line.ol_w_id, order_line.ol_i_id, order_line.ol_amount, order_line.ol_quantity], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(order_line.ol_amount))], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum(order_line.ol_amount))] } └─StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(order_line.ol_amount)] } @@ -2877,7 +2880,7 @@ Fragment 0 StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum(sum(order_line.ol_amount))], noop_update_hint: true } + └── StreamProject { exprs: [sum(sum(order_line.ol_amount))] } └── StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -3407,7 +3410,7 @@ │ └─StreamProject { exprs: [orders.o_c_id, orders.o_w_id, orders.o_d_id, orders.o_id] } │ └─StreamTableScan { table: orders, columns: [orders.o_d_id, orders.o_w_id, orders.o_c_id, orders.o_id], stream_scan_type: ArrangementBackfill, stream_key: [orders.o_w_id, orders.o_d_id, orders.o_id], pk: [o_w_id, o_d_id, o_id], dist: UpstreamHashShard(orders.o_w_id, orders.o_d_id, orders.o_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum(sum(customer.c_balance)) / sum0(count(customer.c_balance))::Decimal) as $expr1], noop_update_hint: true } + └─StreamProject { exprs: [(sum(sum(customer.c_balance)) / sum0(count(customer.c_balance))::Decimal) as $expr1] } └─StreamSimpleAgg { aggs: [sum(sum(customer.c_balance)), sum0(count(customer.c_balance)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(customer.c_balance), count(customer.c_balance)] } diff --git a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml index 2cabf5c6bbd5f..0e5d72b3499a3 100644 --- a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml @@ -74,7 +74,7 @@ └─StreamProject { exprs: [Sqrt($expr5) as $expr6, Case((sum0(count(t.v)) <= 1:Int64), null:Decimal, Sqrt(($expr4 / (sum0(count(t.v)) - 1:Int64)::Decimal))) as $expr7, $expr5, Case((sum0(count(t.v)) <= 1:Int64), null:Decimal, ($expr4 / (sum0(count(t.v)) - 1:Int64)::Decimal)) as $expr8] } └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), ($expr4 / $expr3) as $expr5, $expr4] } └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), (sum(sum($expr1))::Decimal - (($expr2 * $expr2) / $expr3)) as $expr4, $expr3] } - └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), sum(sum(t.v))::Decimal as $expr2, sum0(count(t.v))::Decimal as $expr3], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), sum(sum(t.v))::Decimal as $expr2, sum0(count(t.v))::Decimal as $expr3] } └─StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum(t.v), count(t.v)] } diff --git a/src/frontend/planner_test/tests/testdata/output/dynamic_filter.yaml b/src/frontend/planner_test/tests/testdata/output/dynamic_filter.yaml index 922723851944b..89aea24f2bd80 100644 --- a/src/frontend/planner_test/tests/testdata/output/dynamic_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/output/dynamic_filter.yaml @@ -18,7 +18,7 @@ └─StreamDynamicFilter { predicate: (t1.v1 > max(max(t2.v2))), output: [t1.v1, t1._row_id] } ├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true } + └─StreamProject { exprs: [max(max(t2.v2))] } └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } @@ -77,7 +77,7 @@ ├─StreamProject { exprs: [t1.v1, (t1.v1 + t1.v1) as $expr1, t1._row_id] } │ └─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true } + └─StreamProject { exprs: [max(max(t2.v2))] } └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } @@ -129,7 +129,7 @@ ├─StreamExchange { dist: HashShard(t1.v1) } │ └─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: HashShard(max(max(t2.v2))) } - └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true } + └─StreamProject { exprs: [max(max(t2.v2))] } └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } @@ -153,7 +153,7 @@ ├─StreamProject { exprs: [t1.v1, t1.v1::Int64 as $expr1, t1._row_id] } │ └─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true } + └─StreamProject { exprs: [max(max(t2.v2))] } └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } @@ -169,7 +169,7 @@ └─StreamDynamicFilter { predicate: (t1.v1 > max(max(t2.v2))), output: [t1.v1, t1._row_id] } ├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true } + └─StreamProject { exprs: [max(max(t2.v2))] } └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } @@ -191,7 +191,7 @@ └─StreamDynamicFilter { predicate: (t1.v1 > $expr1), output: [t1.v1, t1._row_id] } ├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(2:Int32 * max(max(t2.v2))) as $expr1], noop_update_hint: true } + └─StreamProject { exprs: [(2:Int32 * max(max(t2.v2))) as $expr1] } └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } @@ -220,7 +220,7 @@ │ ├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } │ └─StreamExchange { dist: Broadcast } │ └─StreamShare { id: 6 } - │ └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true } + │ └─StreamProject { exprs: [max(max(t2.v2))] } │ └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } │ └─StreamExchange { dist: Single } │ └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } @@ -229,7 +229,7 @@ └─StreamExchange { dist: Broadcast } └─StreamProject { exprs: [(max(max(t2.v2)) + 5:Int32) as $expr1] } └─StreamShare { id: 6 } - └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true } + └─StreamProject { exprs: [max(max(t2.v2))] } └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } diff --git a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml index e7a1951ffde54..815890d6a73b8 100644 --- a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml @@ -180,7 +180,7 @@ where path_val = t1.id; stream_plan: |- StreamMaterialize { columns: [array_agg], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [array_agg(t1.n order_by($expr1 ASC))], noop_update_hint: true } + └─StreamProject { exprs: [array_agg(t1.n order_by($expr1 ASC))] } └─StreamSimpleAgg { aggs: [array_agg(t1.n order_by($expr1 ASC)), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [t1.n, (projected_row_id + 1:Int64) as $expr1, t1._row_id, t2.p, t2.p, t2.d, t2.d, projected_row_id, t1.id, t2._row_id] } diff --git a/src/frontend/planner_test/tests/testdata/output/limit.yaml b/src/frontend/planner_test/tests/testdata/output/limit.yaml index 500dbe1dd5824..22fb2add9d30c 100644 --- a/src/frontend/planner_test/tests/testdata/output/limit.yaml +++ b/src/frontend/planner_test/tests/testdata/output/limit.yaml @@ -131,7 +131,7 @@ stream_plan: |- StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamTopN { order: [sum0(count) ASC], limit: 1, offset: 0 } - └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } + └─StreamProject { exprs: [sum0(count)] } └─StreamSimpleAgg { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } @@ -154,7 +154,7 @@ stream_plan: |- StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamTopN { order: [sum0(count) ASC], limit: 1, offset: 0 } - └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } + └─StreamProject { exprs: [sum0(count)] } └─StreamSimpleAgg { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } diff --git a/src/frontend/planner_test/tests/testdata/output/mv_column_name.yaml b/src/frontend/planner_test/tests/testdata/output/mv_column_name.yaml index 91352992bb17a..3db4034336315 100644 --- a/src/frontend/planner_test/tests/testdata/output/mv_column_name.yaml +++ b/src/frontend/planner_test/tests/testdata/output/mv_column_name.yaml @@ -63,7 +63,7 @@ select count(*), max(a) from t; stream_plan: |- StreamMaterialize { columns: [count, max], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count), max(max(t.a))], noop_update_hint: true } + └─StreamProject { exprs: [sum0(count), max(max(t.a))] } └─StreamSimpleAgg { aggs: [sum0(count), max(max(t.a)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [count, max(t.a)] } diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index 1ea7349b24769..d6b90da0a8c1a 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -1891,7 +1891,7 @@ │ └─StreamExchange { dist: HashShard(bid.auction) } │ └─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1], noop_update_hint: true } + └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1] } └─StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(bid.auction)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum0(count), count(bid.auction)] } @@ -1926,7 +1926,7 @@ └── BatchPlanNode Fragment 3 - StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1], noop_update_hint: true } + StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1] } └── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(bid.auction)), count] } { tables: [ SimpleAggState: 9 ] } └── StreamExchange Single from 4 @@ -2331,7 +2331,7 @@ └─BatchScan { table: bid, columns: [bid.auction, bid.price, bid.date_time], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [min(min(max(bid.price)))], noop_update_hint: true } + └─StreamProject { exprs: [min(min(max(bid.price)))] } └─StreamSimpleAgg { aggs: [min(min(max(bid.price))), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [$expr1], aggs: [min(max(bid.price)), count] } @@ -2348,7 +2348,7 @@ Fragment 0 StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [min(min(max(bid.price)))], noop_update_hint: true } + └── StreamProject { exprs: [min(min(max(bid.price)))] } └── StreamSimpleAgg { aggs: [min(min(max(bid.price))), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml index 15e1647721d53..35713c9682a35 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml @@ -1878,7 +1878,7 @@ │ └─StreamRowIdGen { row_id_index: 7 } │ └─StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(auction))) as $expr1], noop_update_hint: true } + └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(auction))) as $expr1] } └─StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(auction)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum0(count), count(auction)] } @@ -1915,7 +1915,7 @@ └── StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } { tables: [ Source: 8 ] } Fragment 4 - StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(auction))) as $expr1], noop_update_hint: true } + StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(auction))) as $expr1] } └── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(auction)), count] } { tables: [ SimpleAggState: 9 ] } └── StreamExchange Single from 5 @@ -2277,7 +2277,7 @@ └─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } stream_plan: |- StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [min(min(max(price)))], noop_update_hint: true } + └─StreamProject { exprs: [min(min(max(price)))] } └─StreamSimpleAgg { aggs: [min(min(max(price))), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [$expr1], aggs: [min(max(price)), count] } @@ -2296,7 +2296,7 @@ Fragment 0 StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [min(min(max(price)))], noop_update_hint: true } + └── StreamProject { exprs: [min(min(max(price)))] } └── StreamSimpleAgg { aggs: [min(min(max(price))), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark_temporal_filter.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark_temporal_filter.yaml index 0658030573dd1..d5d948e5b507c 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark_temporal_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark_temporal_filter.yaml @@ -1517,7 +1517,7 @@ │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '00:05:00':Interval, 'UTC':Varchar) as $expr4], output_watermarks: [$expr4] } │ └─StreamNow { output: [now] } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr5))) as $expr6], noop_update_hint: true } + └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr5))) as $expr6] } └─StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count($expr5)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum0(count), count($expr5)] } @@ -1578,7 +1578,7 @@ └── StreamNow { output: [now] } { tables: [ Now: 10 ] } Fragment 6 - StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr5))) as $expr6], noop_update_hint: true } + StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr5))) as $expr6] } └── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count($expr5)), count] } { tables: [ SimpleAggState: 11 ] } └── StreamExchange Single from 7 @@ -2000,7 +2000,7 @@ ) stream_plan: |- StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [min(min(max($expr7)))], noop_update_hint: true } + └─StreamProject { exprs: [min(min(max($expr7)))] } └─StreamSimpleAgg { aggs: [min(min(max($expr7))), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [$expr9], aggs: [min(max($expr7)), count] } @@ -2035,7 +2035,7 @@ Fragment 0 StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [min(min(max($expr7)))], noop_update_hint: true } + └── StreamProject { exprs: [min(min(max($expr7)))] } └── StreamSimpleAgg { aggs: [min(min(max($expr7))), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml index c577b72eaafd6..f065ba33c252d 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml @@ -2059,7 +2059,7 @@ │ └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] } │ └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr4))) as $expr5], noop_update_hint: true } + └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr4))) as $expr5] } └─StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count($expr4)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum0(count), count($expr4)] } @@ -2111,7 +2111,7 @@ └── StreamExchange NoShuffle from 2 Fragment 5 - StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr4))) as $expr5], noop_update_hint: true } + StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr4))) as $expr5] } └── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count($expr4)), count] } { tables: [ SimpleAggState: 9 ] } └── StreamExchange Single from 6 @@ -2533,7 +2533,7 @@ └─BatchSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] } stream_plan: |- StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [min(min(max($expr5)))], noop_update_hint: true } + └─StreamProject { exprs: [min(min(max($expr5)))] } └─StreamSimpleAgg { aggs: [min(min(max($expr5))), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [$expr6], aggs: [min(max($expr5)), count] } @@ -2564,7 +2564,7 @@ Fragment 0 StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [min(min(max($expr5)))], noop_update_hint: true } + └── StreamProject { exprs: [min(min(max($expr5)))] } └── StreamSimpleAgg { aggs: [min(min(max($expr5))), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 diff --git a/src/frontend/planner_test/tests/testdata/output/share.yaml b/src/frontend/planner_test/tests/testdata/output/share.yaml index 7962e4724f347..2cf3aee9fe043 100644 --- a/src/frontend/planner_test/tests/testdata/output/share.yaml +++ b/src/frontend/planner_test/tests/testdata/output/share.yaml @@ -33,7 +33,7 @@ └─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] } stream_plan: |- StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } + └─StreamProject { exprs: [sum0(count)] } └─StreamSimpleAgg [append_only] { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } @@ -155,7 +155,7 @@ ├─StreamExchange { dist: HashShard(0:Int32) } │ └─StreamProject { exprs: [sum0(count), 0:Int32] } │ └─StreamShare { id: 5 } - │ └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } + │ └─StreamProject { exprs: [sum0(count)] } │ └─StreamSimpleAgg { aggs: [sum0(count), count] } │ └─StreamExchange { dist: Single } │ └─StreamStatelessSimpleAgg { aggs: [count] } @@ -163,7 +163,7 @@ └─StreamExchange { dist: HashShard(1:Int32) } └─StreamProject { exprs: [sum0(count), 1:Int32] } └─StreamShare { id: 5 } - └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } + └─StreamProject { exprs: [sum0(count)] } └─StreamSimpleAgg { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } @@ -176,13 +176,13 @@ StreamMaterialize { columns: [count, $src(hidden)], stream_key: [$src], pk_columns: [$src], pk_conflict: NoCheck } └─StreamUnion { all: true } ├─StreamExchange { dist: HashShard(0:Int32) } - │ └─StreamProject { exprs: [sum0(count), 0:Int32], noop_update_hint: true } + │ └─StreamProject { exprs: [sum0(count), 0:Int32] } │ └─StreamSimpleAgg { aggs: [sum0(count), count] } │ └─StreamExchange { dist: Single } │ └─StreamStatelessSimpleAgg { aggs: [count] } │ └─StreamTableScan { table: t, columns: [t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } └─StreamExchange { dist: HashShard(1:Int32) } - └─StreamProject { exprs: [sum0(count), 1:Int32], noop_update_hint: true } + └─StreamProject { exprs: [sum0(count), 1:Int32] } └─StreamSimpleAgg { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } @@ -195,7 +195,7 @@ select count(*) cnt from auction A join auction B on A.id = B.id; stream_plan: |- StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } + └─StreamProject { exprs: [sum0(count)] } └─StreamSimpleAgg [append_only] { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } @@ -216,7 +216,7 @@ with cte as (select a, sum(b) sum from t group by a) select count(*) from cte c1 join cte c2 on c1.a = c2.a; stream_plan: |- StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } + └─StreamProject { exprs: [sum0(count)] } └─StreamSimpleAgg { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } @@ -235,7 +235,7 @@ Fragment 0 StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum0(count)], noop_update_hint: true } + └── StreamProject { exprs: [sum0(count)] } └── StreamSimpleAgg { aggs: [sum0(count), count] } { tables: [ SimpleAggState: 0 ] } └── StreamExchange Single from 1 diff --git a/src/frontend/planner_test/tests/testdata/output/stream_dist_agg.yaml b/src/frontend/planner_test/tests/testdata/output/stream_dist_agg.yaml index 0b7d7d7f2f2bf..48caec86bd940 100644 --- a/src/frontend/planner_test/tests/testdata/output/stream_dist_agg.yaml +++ b/src/frontend/planner_test/tests/testdata/output/stream_dist_agg.yaml @@ -17,13 +17,13 @@ └─BatchScan { table: s, columns: [s.v], distribution: Single } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(s.v)], noop_update_hint: true } + └─StreamProject { exprs: [max(s.v)] } └─StreamSimpleAgg { aggs: [max(s.v), count] } └─StreamTableScan { table: s, columns: [s.v, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } - └── StreamProject { exprs: [max(s.v)], noop_update_hint: true } + └── StreamProject { exprs: [max(s.v)] } └── StreamSimpleAgg { aggs: [max(s.v), count] } { tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] } └── StreamTableScan { table: s, columns: [s.v, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } ├── tables: [ StreamScan: 2 ] @@ -55,13 +55,13 @@ └─BatchScan { table: s, columns: [s.v], distribution: Single } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(s.v)], noop_update_hint: true } + └─StreamProject { exprs: [sum(s.v)] } └─StreamSimpleAgg { aggs: [sum(s.v), count] } └─StreamTableScan { table: s, columns: [s.v, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } - └── StreamProject { exprs: [sum(s.v)], noop_update_hint: true } + └── StreamProject { exprs: [sum(s.v)] } └── StreamSimpleAgg { aggs: [sum(s.v), count] } { tables: [ SimpleAggState: 0 ] } └── StreamTableScan { table: s, columns: [s.v, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } ├── tables: [ StreamScan: 1 ] @@ -91,13 +91,13 @@ └─BatchScan { table: s, columns: [s.v], distribution: Single } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [count(s.v)], noop_update_hint: true } + └─StreamProject { exprs: [count(s.v)] } └─StreamSimpleAgg { aggs: [count(s.v), count] } └─StreamTableScan { table: s, columns: [s.v, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } - └── StreamProject { exprs: [count(s.v)], noop_update_hint: true } + └── StreamProject { exprs: [count(s.v)] } └── StreamSimpleAgg { aggs: [count(s.v), count] } { tables: [ SimpleAggState: 0 ] } └── StreamTableScan { table: s, columns: [s.v, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } ├── tables: [ StreamScan: 1 ] @@ -128,14 +128,14 @@ └─BatchScan { table: s, columns: [s.v, s.s], distribution: Single } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [string_agg(s.s, ',':Varchar order_by(s.v ASC))], noop_update_hint: true } + └─StreamProject { exprs: [string_agg(s.s, ',':Varchar order_by(s.v ASC))] } └─StreamSimpleAgg { aggs: [string_agg(s.s, ',':Varchar order_by(s.v ASC)), count] } └─StreamProject { exprs: [s.s, ',':Varchar, s.v, s.t._row_id] } └─StreamTableScan { table: s, columns: [s.v, s.s, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } - └── StreamProject { exprs: [string_agg(s.s, ',':Varchar order_by(s.v ASC))], noop_update_hint: true } + └── StreamProject { exprs: [string_agg(s.s, ',':Varchar order_by(s.v ASC))] } └── StreamSimpleAgg { aggs: [string_agg(s.s, ',':Varchar order_by(s.v ASC)), count] } { tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] } └── StreamProject { exprs: [s.s, ',':Varchar, s.v, s.t._row_id] } └── StreamTableScan { table: s, columns: [s.v, s.s, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } @@ -169,7 +169,7 @@ └─BatchScan { table: t, columns: [t.v], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(max(t.v))], noop_update_hint: true } + └─StreamProject { exprs: [max(max(t.v))] } └─StreamSimpleAgg { aggs: [max(max(t.v)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t.v), count] } @@ -179,7 +179,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [max(max(t.v))], noop_update_hint: true } + └── StreamProject { exprs: [max(max(t.v))] } └── StreamSimpleAgg { aggs: [max(max(t.v)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 @@ -223,7 +223,7 @@ select max(v) as a1 from AO; stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(max(ao.v))], noop_update_hint: true } + └─StreamProject { exprs: [max(max(ao.v))] } └─StreamSimpleAgg [append_only] { aggs: [max(max(ao.v)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [max(ao.v)] } @@ -232,7 +232,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [max(max(ao.v))], noop_update_hint: true } + └── StreamProject { exprs: [max(max(ao.v))] } └── StreamSimpleAgg [append_only] { aggs: [max(max(ao.v)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -268,7 +268,7 @@ └─BatchScan { table: t, columns: [t.v], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v))], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum(t.v))] } └─StreamSimpleAgg { aggs: [sum(sum(t.v)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v)] } @@ -277,7 +277,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum(sum(t.v))], noop_update_hint: true } + └── StreamProject { exprs: [sum(sum(t.v))] } └── StreamSimpleAgg { aggs: [sum(sum(t.v)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -308,7 +308,7 @@ select sum(v) as a1 from AO; stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(ao.v))], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum(ao.v))] } └─StreamSimpleAgg [append_only] { aggs: [sum(sum(ao.v)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(ao.v)] } @@ -317,7 +317,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum(sum(ao.v))], noop_update_hint: true } + └── StreamProject { exprs: [sum(sum(ao.v))] } └── StreamSimpleAgg [append_only] { aggs: [sum(sum(ao.v)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -353,7 +353,7 @@ └─BatchScan { table: t, columns: [t.v], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count(t.v))], noop_update_hint: true } + └─StreamProject { exprs: [sum0(count(t.v))] } └─StreamSimpleAgg { aggs: [sum0(count(t.v)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count(t.v)] } @@ -362,7 +362,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum0(count(t.v))], noop_update_hint: true } + └── StreamProject { exprs: [sum0(count(t.v))] } └── StreamSimpleAgg { aggs: [sum0(count(t.v)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -393,7 +393,7 @@ select count(v) as a1 from AO; stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count(ao.v))], noop_update_hint: true } + └─StreamProject { exprs: [sum0(count(ao.v))] } └─StreamSimpleAgg [append_only] { aggs: [sum0(count(ao.v)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count(ao.v)] } @@ -402,7 +402,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum0(count(ao.v))], noop_update_hint: true } + └── StreamProject { exprs: [sum0(count(ao.v))] } └── StreamSimpleAgg [append_only] { aggs: [sum0(count(ao.v)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -438,7 +438,7 @@ └─BatchScan { table: t, columns: [t.o, t.s], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [string_agg(t.s, ',':Varchar order_by(t.o ASC))], noop_update_hint: true } + └─StreamProject { exprs: [string_agg(t.s, ',':Varchar order_by(t.o ASC))] } └─StreamSimpleAgg { aggs: [string_agg(t.s, ',':Varchar order_by(t.o ASC)), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [t.s, ',':Varchar, t.o, t._row_id] } @@ -447,7 +447,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [string_agg(t.s, ',':Varchar order_by(t.o ASC))], noop_update_hint: true } + └── StreamProject { exprs: [string_agg(t.s, ',':Varchar order_by(t.o ASC))] } └── StreamSimpleAgg { aggs: [string_agg(t.s, ',':Varchar order_by(t.o ASC)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 @@ -480,7 +480,7 @@ select string_agg(s, ',' order by o) as a1 from AO; stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC))], noop_update_hint: true } + └─StreamProject { exprs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } └─StreamSimpleAgg [append_only] { aggs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [ao.s, ',':Varchar, ao.o, ao._row_id] } @@ -489,7 +489,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC))], noop_update_hint: true } + └── StreamProject { exprs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } └── StreamSimpleAgg [append_only] { aggs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 @@ -527,7 +527,7 @@ └─BatchScan { table: t, columns: [t.v], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(max(t.v)), sum0(count(t.v))], noop_update_hint: true } + └─StreamProject { exprs: [max(max(t.v)), sum0(count(t.v))] } └─StreamSimpleAgg { aggs: [max(max(t.v)), sum0(count(t.v)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t.v), count(t.v), count] } @@ -537,7 +537,7 @@ Fragment 0 StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [max(max(t.v)), sum0(count(t.v))], noop_update_hint: true } + └── StreamProject { exprs: [max(max(t.v)), sum0(count(t.v))] } └── StreamSimpleAgg { aggs: [max(max(t.v)), sum0(count(t.v)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 @@ -587,7 +587,7 @@ select max(v) as a1, count(v) as a2 from AO; stream_plan: |- StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(max(ao.v)), sum0(count(ao.v))], noop_update_hint: true } + └─StreamProject { exprs: [max(max(ao.v)), sum0(count(ao.v))] } └─StreamSimpleAgg [append_only] { aggs: [max(max(ao.v)), sum0(count(ao.v)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [max(ao.v), count(ao.v)] } @@ -596,7 +596,7 @@ Fragment 0 StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [max(max(ao.v)), sum0(count(ao.v))], noop_update_hint: true } + └── StreamProject { exprs: [max(max(ao.v)), sum0(count(ao.v))] } └── StreamSimpleAgg [append_only] { aggs: [max(max(ao.v)), sum0(count(ao.v)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -632,7 +632,7 @@ └─BatchScan { table: t, columns: [t.v, t.o, t.s], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [count(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))], noop_update_hint: true } + └─StreamProject { exprs: [count(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))] } └─StreamSimpleAgg { aggs: [count(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC)), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [t.v, t.s, ',':Varchar, t.o, t._row_id] } @@ -641,7 +641,7 @@ Fragment 0 StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [count(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))], noop_update_hint: true } + └── StreamProject { exprs: [count(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))] } └── StreamSimpleAgg { aggs: [count(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall1: 0 ] └── StreamExchange Single from 1 @@ -679,7 +679,7 @@ select count(v) as a1, string_agg(s, ',' order by o) as a2 from AO; stream_plan: |- StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [count(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))], noop_update_hint: true } + └─StreamProject { exprs: [count(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } └─StreamSimpleAgg [append_only] { aggs: [count(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [ao.v, ao.s, ',':Varchar, ao.o, ao._row_id] } @@ -688,7 +688,7 @@ Fragment 0 StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [count(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))], noop_update_hint: true } + └── StreamProject { exprs: [count(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } └── StreamSimpleAgg [append_only] { aggs: [count(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall1: 0 ] └── StreamExchange Single from 1 @@ -726,7 +726,7 @@ └─BatchScan { table: t, columns: [t.v, t.o, t.s], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))], noop_update_hint: true } + └─StreamProject { exprs: [max(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))] } └─StreamSimpleAgg { aggs: [max(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC)), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [t.v, t.s, ',':Varchar, t.o, t._row_id] } @@ -735,7 +735,7 @@ Fragment 0 StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [max(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))], noop_update_hint: true } + └── StreamProject { exprs: [max(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))] } └── StreamSimpleAgg { aggs: [max(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC)), count] } ├── tables: [ SimpleAggState: 2, SimpleAggCall0: 0, SimpleAggCall1: 1 ] └── StreamExchange Single from 1 @@ -770,7 +770,7 @@ select max(v) as a1, string_agg(s, ',' order by o) as a2 from AO; stream_plan: |- StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))], noop_update_hint: true } + └─StreamProject { exprs: [max(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } └─StreamSimpleAgg [append_only] { aggs: [max(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [ao.v, ao.s, ',':Varchar, ao.o, ao._row_id] } @@ -779,7 +779,7 @@ Fragment 0 StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [max(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))], noop_update_hint: true } + └── StreamProject { exprs: [max(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } └── StreamSimpleAgg [append_only] { aggs: [max(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall1: 0 ] └── StreamExchange Single from 1 diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml index 49d14526af640..5cdfdf6cf45ea 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml @@ -61,7 +61,7 @@ select count(*) from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 where a2 < 10; stream_plan: |- StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } + └─StreamProject { exprs: [sum0(count)] } └─StreamSimpleAgg [append_only] { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } diff --git a/src/frontend/planner_test/tests/testdata/output/tpch.yaml b/src/frontend/planner_test/tests/testdata/output/tpch.yaml index dbb7a5c08a62a..3c43faa8d2494 100644 --- a/src/frontend/planner_test/tests/testdata/output/tpch.yaml +++ b/src/frontend/planner_test/tests/testdata/output/tpch.yaml @@ -1160,7 +1160,7 @@ └─BatchScan { table: lineitem, columns: [lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_quantity, lineitem.l_shipdate], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum($expr1))], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum($expr1))] } └─StreamSimpleAgg { aggs: [sum(sum($expr1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1)] } @@ -1171,7 +1171,7 @@ Fragment 0 StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum(sum($expr1))], noop_update_hint: true } + └── StreamProject { exprs: [sum(sum($expr1))] } └── StreamSimpleAgg { aggs: [sum(sum($expr1)), count] } { tables: [ SimpleAggState: 0 ] } └── StreamExchange Single from 1 @@ -2389,7 +2389,7 @@ │ └─StreamFilter { predicate: (nation.n_name = 'ARGENTINA':Varchar) } │ └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], stream_scan_type: ArrangementBackfill, stream_key: [nation.n_nationkey], pk: [n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum(sum($expr2)) * 0.0001000000:Decimal) as $expr3], noop_update_hint: true } + └─StreamProject { exprs: [(sum(sum($expr2)) * 0.0001000000:Decimal) as $expr3] } └─StreamSimpleAgg { aggs: [sum(sum($expr2)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr2)] } @@ -2461,7 +2461,7 @@ └── BatchPlanNode Fragment 8 - StreamProject { exprs: [(sum(sum($expr2)) * 0.0001000000:Decimal) as $expr3], noop_update_hint: true } + StreamProject { exprs: [(sum(sum($expr2)) * 0.0001000000:Decimal) as $expr3] } └── StreamSimpleAgg { aggs: [sum(sum($expr2)), count] } { tables: [ SimpleAggState: 16 ] } └── StreamExchange Single from 9 @@ -2818,7 +2818,7 @@ └─BatchScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / sum(sum($expr2))) as $expr3], noop_update_hint: true } + └─StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / sum(sum($expr2))) as $expr3] } └─StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum($expr2)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum($expr2)] } @@ -2834,8 +2834,9 @@ Fragment 0 StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / sum(sum($expr2))) as $expr3], noop_update_hint: true } - └── StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum($expr2)), count] } { tables: [ SimpleAggState: 0 ] } + └── StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / sum(sum($expr2))) as $expr3] } + └── StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum($expr2)), count] } + ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 Fragment 1 @@ -2965,7 +2966,7 @@ │ └─StreamFilter { predicate: (lineitem.l_shipdate >= '1993-01-01':Date) AND (lineitem.l_shipdate < '1993-04-01 00:00:00':Timestamp) } │ └─StreamTableScan { table: lineitem, columns: [lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_shipdate], stream_scan_type: ArrangementBackfill, stream_key: [lineitem.l_orderkey, lineitem.l_linenumber], pk: [l_orderkey, l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } └─StreamExchange { dist: HashShard(max(max(sum($expr1)))) } - └─StreamProject { exprs: [max(max(sum($expr1)))], noop_update_hint: true } + └─StreamProject { exprs: [max(max(sum($expr1)))] } └─StreamSimpleAgg { aggs: [max(max(sum($expr1))), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(sum($expr1)), count] } @@ -3019,7 +3020,7 @@ └── BatchPlanNode Fragment 6 - StreamProject { exprs: [max(max(sum($expr1)))], noop_update_hint: true } + StreamProject { exprs: [max(max(sum($expr1)))] } └── StreamSimpleAgg { aggs: [max(max(sum($expr1))), count] } { tables: [ SimpleAggState: 14, SimpleAggCall0: 13 ] } └── StreamExchange Single from 7 @@ -3295,7 +3296,7 @@ └─BatchScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_quantity], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [avg_yearly], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [(sum(sum(lineitem.l_extendedprice)) / 7.0:Decimal) as $expr2], noop_update_hint: true } + └─StreamProject { exprs: [(sum(sum(lineitem.l_extendedprice)) / 7.0:Decimal) as $expr2] } └─StreamSimpleAgg { aggs: [sum(sum(lineitem.l_extendedprice)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(lineitem.l_extendedprice)] } @@ -3318,8 +3319,9 @@ Fragment 0 StreamMaterialize { columns: [avg_yearly], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [(sum(sum(lineitem.l_extendedprice)) / 7.0:Decimal) as $expr2], noop_update_hint: true } - └── StreamSimpleAgg { aggs: [sum(sum(lineitem.l_extendedprice)), count] } { tables: [ SimpleAggState: 0 ] } + └── StreamProject { exprs: [(sum(sum(lineitem.l_extendedprice)) / 7.0:Decimal) as $expr2] } + └── StreamSimpleAgg { aggs: [sum(sum(lineitem.l_extendedprice)), count] } + ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 Fragment 1 @@ -3670,7 +3672,7 @@ └─BatchScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipinstruct, lineitem.l_shipmode], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum($expr1))], noop_update_hint: true } + └─StreamProject { exprs: [sum(sum($expr1))] } └─StreamSimpleAgg { aggs: [sum(sum($expr1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1)] } @@ -3688,7 +3690,7 @@ Fragment 0 StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum(sum($expr1))], noop_update_hint: true } + └── StreamProject { exprs: [sum(sum($expr1))] } └── StreamSimpleAgg { aggs: [sum(sum($expr1)), count] } { tables: [ SimpleAggState: 0 ] } └── StreamExchange Single from 1 @@ -4340,7 +4342,7 @@ │ └─StreamExchange { dist: HashShard(orders.o_custkey) } │ └─StreamTableScan { table: orders, columns: [orders.o_custkey, orders.o_orderkey], stream_scan_type: ArrangementBackfill, stream_key: [orders.o_orderkey], pk: [o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum(sum(customer.c_acctbal)) / sum0(count(customer.c_acctbal))::Decimal) as $expr1], noop_update_hint: true } + └─StreamProject { exprs: [(sum(sum(customer.c_acctbal)) / sum0(count(customer.c_acctbal))::Decimal) as $expr1] } └─StreamSimpleAgg { aggs: [sum(sum(customer.c_acctbal)), sum0(count(customer.c_acctbal)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(customer.c_acctbal), count(customer.c_acctbal)] } diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index 9e774628fc262..b0ad102ee693c 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -86,6 +86,8 @@ impl LogicalAgg { bail!("expected at least one agg call"); } + let need_row_merge: bool = Self::need_row_merge(&approx_percentile); + // ====== Handle normal aggs let total_agg_calls = core .agg_calls @@ -98,8 +100,12 @@ impl LogicalAgg { let local_agg = StreamStatelessSimpleAgg::new(core); let exchange = RequiredDist::single().enforce_if_not_satisfies(local_agg.into(), &Order::any())?; - let global_agg = - new_stream_simple_agg(Agg::new(total_agg_calls, IndexSet::empty(), exchange)); + + let must_output_per_barrier = need_row_merge; + let global_agg = new_stream_simple_agg( + Agg::new(total_agg_calls, IndexSet::empty(), exchange), + must_output_per_barrier, + ); // ====== Merge approx percentile and normal aggs Self::add_row_merge_if_needed( @@ -129,6 +135,7 @@ impl LogicalAgg { }; bail!("expected at least one agg call"); } + let need_row_merge = Self::need_row_merge(&approx_percentile); // Generate vnode via project // TODO(kwannoel): We should apply Project optimization rules here. @@ -157,19 +164,26 @@ impl LogicalAgg { let global_agg = if self.group_key().is_empty() { let exchange = RequiredDist::single().enforce_if_not_satisfies(local_agg.into(), &Order::any())?; - let global_agg = new_stream_simple_agg(Agg::new( - core.agg_calls - .iter() - .enumerate() - .map(|(partial_output_idx, agg_call)| { - agg_call.partial_to_total_agg_call(n_local_group_key + partial_output_idx) - }) - .collect(), - global_group_key.into_iter().collect(), - exchange, - )); + let must_output_per_barrier = need_row_merge; + let global_agg = new_stream_simple_agg( + Agg::new( + core.agg_calls + .iter() + .enumerate() + .map(|(partial_output_idx, agg_call)| { + agg_call + .partial_to_total_agg_call(n_local_group_key + partial_output_idx) + }) + .collect(), + global_group_key.into_iter().collect(), + exchange, + ), + must_output_per_barrier, + ); global_agg.into() } else { + // the `RowMergeExec` has not supported keyed merge + assert!(!need_row_merge); let exchange = RequiredDist::shard_by_key(input_col_num, &global_group_key) .enforce_if_not_satisfies(local_agg.into(), &Order::any())?; // Local phase should have reordered the group keys into their required order. @@ -203,7 +217,7 @@ impl LogicalAgg { let mut core = self.core.clone(); let input = RequiredDist::single().enforce_if_not_satisfies(stream_input, &Order::any())?; core.input = input; - Ok(new_stream_simple_agg(core).into()) + Ok(new_stream_simple_agg(core, false).into()) } fn gen_shuffle_plan(&self, stream_input: PlanRef) -> Result { @@ -339,6 +353,10 @@ impl LogicalAgg { )) } + fn need_row_merge(approx_percentile: &Option) -> bool { + approx_percentile.is_some() + } + /// Add `RowMerge` if needed fn add_row_merge_if_needed( approx_percentile: Option, @@ -346,7 +364,11 @@ impl LogicalAgg { approx_percentile_col_mapping: ColIndexMapping, non_approx_percentile_col_mapping: ColIndexMapping, ) -> Result { + // just for assert + let need_row_merge = Self::need_row_merge(&approx_percentile); + if let Some(approx_percentile) = approx_percentile { + assert!(need_row_merge); let row_merge = StreamRowMerge::new( approx_percentile, global_agg, @@ -355,6 +377,7 @@ impl LogicalAgg { )?; Ok(row_merge.into()) } else { + assert!(!need_row_merge); Ok(global_agg) } } @@ -1305,9 +1328,9 @@ fn find_or_append_row_count(mut logical: Agg) -> (Agg, usize) (logical, row_count_idx) } -fn new_stream_simple_agg(core: Agg) -> StreamSimpleAgg { +fn new_stream_simple_agg(core: Agg, must_output_per_barrier: bool) -> StreamSimpleAgg { let (logical, row_count_idx) = find_or_append_row_count(core); - StreamSimpleAgg::new(logical, row_count_idx) + StreamSimpleAgg::new(logical, row_count_idx, must_output_per_barrier) } fn new_stream_hash_agg(core: Agg, vnode_col_idx: Option) -> StreamHashAgg { @@ -1386,19 +1409,12 @@ impl ToStream for LogicalAgg { panic!("the root PlanNode must be StreamHashAgg, StreamSimpleAgg, StreamGlobalApproxPercentile, or StreamRowMerge"); }; - let is_hash_agg = !self.group_key().is_empty(); - // "Simple Agg" includes normal simple agg, as well as approx percentile simple 2 phase agg. - let is_simple_agg = !is_hash_agg; - if self.agg_calls().len() == n_final_agg_calls && is_hash_agg { + if self.agg_calls().len() == n_final_agg_calls { // an existing `count(*)` is used as row count column in `StreamXxxAgg` Ok(plan) } else { - // For hash agg, a `count(*)` is appended, should project the output. - // For simple agg, we output every epoch, so we will always add a project - // to filter out no-op updates, and we don't need the following assert. - if is_hash_agg { - assert_eq!(self.agg_calls().len() + 1, n_final_agg_calls); - } + // a `count(*)` is appended, should project the output + assert_eq!(self.agg_calls().len() + 1, n_final_agg_calls); Ok(StreamProject::new(generic::Project::with_out_col_idx( plan, 0..self.schema().len(), @@ -1407,9 +1423,7 @@ impl ToStream for LogicalAgg { // Since it'll be pruned immediately in `StreamProject`, the update records are likely to be // no-op. So we set the hint to instruct the executor to eliminate them. // See https://github.com/risingwavelabs/risingwave/issues/17030. - // Further for simple agg, we also have to set the hint to eliminate no-op updates. - // Since we will output every epoch. - .with_noop_update_hint(self.agg_calls().is_empty() || is_simple_agg) + .with_noop_update_hint(self.agg_calls().is_empty()) .into()) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs index 6ecaa4c308f5e..f9f125654f402 100644 --- a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs @@ -33,10 +33,18 @@ pub struct StreamSimpleAgg { /// The index of `count(*)` in `agg_calls`. row_count_idx: usize, + + // Required by the downstream `RowMerge`, + // currently only used by the `approx_percentile`'s two phase plan + must_output_per_barrier: bool, } impl StreamSimpleAgg { - pub fn new(core: generic::Agg, row_count_idx: usize) -> Self { + pub fn new( + core: generic::Agg, + row_count_idx: usize, + must_output_per_barrier: bool, + ) -> Self { assert_eq!(core.agg_calls[row_count_idx], PlanAggCall::count_star()); let input = core.input.clone(); @@ -62,6 +70,7 @@ impl StreamSimpleAgg { base, core, row_count_idx, + must_output_per_barrier, } } @@ -75,7 +84,11 @@ impl Distill for StreamSimpleAgg { let name = plan_node_name!("StreamSimpleAgg", { "append_only", self.input().append_only() }, ); - childless_record(name, self.core.fields_pretty()) + let mut vec = self.core.fields_pretty(); + if self.must_output_per_barrier { + vec.push(("must_output_per_barrier", "true".into())); + } + childless_record(name, vec) } } @@ -89,7 +102,7 @@ impl PlanTreeNodeUnary for StreamSimpleAgg { input, ..self.core.clone() }; - Self::new(logical, self.row_count_idx) + Self::new(logical, self.row_count_idx, self.must_output_per_barrier) } } impl_plan_tree_node_for_unary! { StreamSimpleAgg } @@ -137,6 +150,7 @@ impl StreamNode for StreamSimpleAgg { .collect(), row_count_index: self.row_count_idx as u32, version: PbAggNodeVersion::Issue13465 as _, + must_output_per_barrier: self.must_output_per_barrier, }) } } @@ -149,7 +163,7 @@ impl ExprRewritable for StreamSimpleAgg { fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let mut core = self.core.clone(); core.rewrite_exprs(r); - Self::new(core, self.row_count_idx).into() + Self::new(core, self.row_count_idx, self.must_output_per_barrier).into() } } diff --git a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs index 93c56efad3d5f..edb9121baf595 100644 --- a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs @@ -102,6 +102,7 @@ impl StreamNode for StreamStatelessSimpleAgg { is_append_only: self.input().append_only(), distinct_dedup_tables: Default::default(), version: AggNodeVersion::Issue13465 as _, + must_output_per_barrier: false, // this is not used }) } } diff --git a/src/stream/src/executor/agg_common.rs b/src/stream/src/executor/agg_common.rs index 2cb3cad8fb2d8..c185222298d80 100644 --- a/src/stream/src/executor/agg_common.rs +++ b/src/stream/src/executor/agg_common.rs @@ -46,7 +46,9 @@ pub struct AggExecutorArgs { pub trait AggExecutorExtraArgs {} -pub struct SimpleAggExecutorExtraArgs {} +pub struct SimpleAggExecutorExtraArgs { + pub must_output_per_barrier: bool, +} impl AggExecutorExtraArgs for SimpleAggExecutorExtraArgs {} /// Extra arguments needed to construct an `HashAggExecutor`. diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 0b7415adac38b..5fc4b4757fa00 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -192,6 +192,7 @@ async fn test_merger_sum_aggr() { 2, // row_count_index vec![], 2, + false, ) .await; diff --git a/src/stream/src/executor/simple_agg.rs b/src/stream/src/executor/simple_agg.rs index a08049268e5b4..fdecd5b7a4502 100644 --- a/src/stream/src/executor/simple_agg.rs +++ b/src/stream/src/executor/simple_agg.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use risingwave_common::array::stream_record::Record; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::aggregate::{build_retractable, AggCall, BoxedAggregateFunction}; @@ -83,6 +84,10 @@ struct ExecutorInner { /// Extreme state cache size extreme_cache_size: usize, + + /// Required by the downstream `RowMergeExecutor`, + /// currently only used by the `approx_percentile`'s two phase plan + must_output_per_barrier: bool, } impl ExecutorInner { @@ -129,6 +134,7 @@ impl SimpleAggExecutor { distinct_dedup_tables: args.distinct_dedup_tables, watermark_epoch: args.watermark_epoch, extreme_cache_size: args.extreme_cache_size, + must_output_per_barrier: args.extra.must_output_per_barrier, }, }) } @@ -201,7 +207,16 @@ impl SimpleAggExecutor { .agg_group .build_change(&this.storages, &this.agg_funcs) .await? - .map(|change| change.to_stream_chunk(&this.info.schema.data_types())); + .and_then(|change| { + if !this.must_output_per_barrier { + if let Record::Update { old_row, new_row } = &change { + if old_row == new_row { + return None; + } + }; + } + Some(change.to_stream_chunk(&this.info.schema.data_types())) + }); // Commit all state tables. futures::future::try_join_all(this.all_state_tables_mut().map(|table| table.commit(epoch))) @@ -343,6 +358,7 @@ mod tests { 0, vec![2], 1, + false, ) .await; let mut simple_agg = simple_agg.execute(); @@ -431,6 +447,7 @@ mod tests { 0, vec![2], 1, + true, ) .await; let mut simple_agg = simple_agg.execute(); @@ -481,4 +498,80 @@ mod tests { Message::Barrier { .. } ); } + + // NOTE(kwannoel): `approx_percentile` + `keyed_merge` depend on this property for correctness. + #[tokio::test] + async fn test_simple_aggregation_omit_noop_update() { + let store = MemoryStateStore::new(); + let schema = Schema { + fields: vec![ + Field::unnamed(DataType::Int64), + Field::unnamed(DataType::Int64), + // primary key column` + Field::unnamed(DataType::Int64), + ], + }; + let (mut tx, source) = MockSource::channel(); + let source = source.into_executor(schema, vec![2]); + // initial barrier + tx.push_barrier(test_epoch(1), false); + // next barrier + tx.push_barrier(test_epoch(2), false); + tx.push_chunk(StreamChunk::from_pretty( + " I I I + + 100 200 1001 + - 100 200 1001", + )); + tx.push_barrier(test_epoch(3), false); + tx.push_barrier(test_epoch(4), false); + + let agg_calls = vec![ + AggCall::from_pretty("(count:int8)"), + AggCall::from_pretty("(sum:int8 $0:int8)"), + AggCall::from_pretty("(sum:int8 $1:int8)"), + AggCall::from_pretty("(min:int8 $0:int8)"), + ]; + + let simple_agg = new_boxed_simple_agg_executor( + ActorContext::for_test(123), + store, + source, + false, + agg_calls, + 0, + vec![2], + 1, + false, + ) + .await; + let mut simple_agg = simple_agg.execute(); + + // Consume the init barrier + simple_agg.next().await.unwrap().unwrap(); + // Consume stream chunk + let msg = simple_agg.next().await.unwrap().unwrap(); + assert_eq!( + *msg.as_chunk().unwrap(), + StreamChunk::from_pretty( + " I I I I + + 0 . . . " + ) + ); + assert_matches!( + simple_agg.next().await.unwrap().unwrap(), + Message::Barrier { .. } + ); + + // No stream chunk + assert_matches!( + simple_agg.next().await.unwrap().unwrap(), + Message::Barrier { .. } + ); + + // No stream chunk + assert_matches!( + simple_agg.next().await.unwrap().unwrap(), + Message::Barrier { .. } + ); + } } diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index db024411ea0ad..4744bae374bfb 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -515,6 +515,7 @@ pub mod agg_executor { row_count_index: usize, pk_indices: PkIndices, executor_id: u64, + must_output_per_barrier: bool, ) -> Executor { let storages = future::join_all(agg_calls.iter().enumerate().map(|(idx, agg_call)| { create_agg_state_storage( @@ -560,7 +561,9 @@ pub mod agg_executor { intermediate_state_table, distinct_dedup_tables: Default::default(), watermark_epoch: Arc::new(AtomicU64::new(0)), - extra: SimpleAggExecutorExtraArgs {}, + extra: SimpleAggExecutorExtraArgs { + must_output_per_barrier, + }, }) .unwrap(); (info, exec).into() diff --git a/src/stream/src/from_proto/simple_agg.rs b/src/stream/src/from_proto/simple_agg.rs index 16809edb8bcaf..689acc7d16a9c 100644 --- a/src/stream/src/from_proto/simple_agg.rs +++ b/src/stream/src/from_proto/simple_agg.rs @@ -54,6 +54,7 @@ impl ExecutorBuilder for SimpleAggExecutorBuilder { let distinct_dedup_tables = build_distinct_dedup_table_from_proto(node.get_distinct_dedup_tables(), store, None) .await; + let must_output_per_barrier = node.get_must_output_per_barrier(); let exec = SimpleAggExecutor::new(AggExecutorArgs { version: node.version(), @@ -70,7 +71,9 @@ impl ExecutorBuilder for SimpleAggExecutorBuilder { intermediate_state_table, distinct_dedup_tables, watermark_epoch: params.watermark_epoch, - extra: SimpleAggExecutorExtraArgs {}, + extra: SimpleAggExecutorExtraArgs { + must_output_per_barrier, + }, })?; Ok((params.info, exec).into())