diff --git a/src/frontend/planner_test/tests/testdata/output/agg.yaml b/src/frontend/planner_test/tests/testdata/output/agg.yaml index 3c6f0d613361..e44426caa3a4 100644 --- a/src/frontend/planner_test/tests/testdata/output/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/output/agg.yaml @@ -71,7 +71,7 @@ └─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [agg], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [(min(min(t.v1)) + (max(max(t.v2)) * sum0(count(t.v3)))) as $expr1] } + └─StreamProject { exprs: [(min(min(t.v1)) + (max(max(t.v2)) * sum0(count(t.v3)))) as $expr1], noop_update_hint: true } └─StreamSimpleAgg { aggs: [min(min(t.v1)), max(max(t.v2)), sum0(count(t.v3)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [min(t.v1), max(t.v2), count(t.v3), count] } @@ -273,7 +273,7 @@ └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [cnt, sum], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count($expr1)), sum(sum($expr1))] } + └─StreamProject { exprs: [sum0(count($expr1)), sum(sum($expr1))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum0(count($expr1)), sum(sum($expr1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count($expr1), sum($expr1)] } @@ -571,7 +571,7 @@ └─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [agg], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [(min(min(t.v1)) + (max(max(t.v3)) * sum0(count(t.v2)))) as $expr1] } + └─StreamProject { exprs: [(min(min(t.v1)) + (max(max(t.v3)) * sum0(count(t.v2)))) as $expr1], noop_update_hint: true } └─StreamSimpleAgg { aggs: [min(min(t.v1)), max(max(t.v3)), sum0(count(t.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [min(t.v1), max(t.v3), count(t.v2), count] } @@ -628,7 +628,7 @@ └─LogicalScan { table: t, columns: [t.v1] } stream_plan: |- StreamMaterialize { columns: [s1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1))] } + └─StreamProject { exprs: [sum(sum(t.v1))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] } @@ -647,7 +647,7 @@ └─LogicalScan { table: t, columns: [t.v1] } stream_plan: |- StreamMaterialize { columns: [s1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1))] } + └─StreamProject { exprs: [sum(sum(t.v1))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] } @@ -666,7 +666,7 @@ └─LogicalScan { table: t, columns: [t.v1] } stream_plan: |- StreamMaterialize { columns: [s1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1))] } + └─StreamProject { exprs: [sum(sum(t.v1))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] } @@ -685,7 +685,7 @@ └─LogicalScan { table: t, columns: [t.v1] } stream_plan: |- StreamMaterialize { columns: [sa], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1) filter((t.v1 > 0:Int32)))] } + └─StreamProject { exprs: [sum(sum(t.v1) filter((t.v1 > 0:Int32)))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum(t.v1) filter((t.v1 > 0:Int32))), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v1) filter((t.v1 > 0:Int32))] } @@ -720,7 +720,7 @@ └─LogicalScan { table: t, columns: [t.a, t.b] } stream_plan: |- StreamMaterialize { columns: [sab], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(max($expr1) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32))))] } + └─StreamProject { exprs: [max(max($expr1) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32))))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [max(max($expr1) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32)))), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [$expr2], aggs: [max($expr1) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32))), count] } @@ -759,7 +759,7 @@ └─LogicalScan { table: t, columns: [t.a, t.b] } stream_plan: |- StreamMaterialize { columns: [cnt_agb], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count filter((t.a > t.b)))] } + └─StreamProject { exprs: [sum0(count filter((t.a > t.b)))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum0(count filter((t.a > t.b))), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count filter((t.a > t.b))] } @@ -813,7 +813,7 @@ └─BatchScan { table: t, columns: [t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [b], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v2) filter((t.v2 < 5:Int32)))] } + └─StreamProject { exprs: [sum(sum(t.v2) filter((t.v2 < 5:Int32)))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum(t.v2) filter((t.v2 < 5:Int32))), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v2) filter((t.v2 < 5:Int32))] } @@ -896,7 +896,7 @@ └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [string_agg, count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [string_agg(t.y, ',':Varchar), count(distinct t.x)] } + └─StreamProject { exprs: [string_agg(t.y, ',':Varchar), count(distinct t.x)], noop_update_hint: true } └─StreamSimpleAgg { aggs: [string_agg(t.y, ',':Varchar), count(distinct t.x), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [t.y, ',':Varchar, t.x, t._row_id] } @@ -917,7 +917,7 @@ └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [string_agg, count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [string_agg(t.y, ',':Varchar order_by(t.y ASC)), count(distinct t.x)] } + └─StreamProject { exprs: [string_agg(t.y, ',':Varchar order_by(t.y ASC)), count(distinct t.x)], noop_update_hint: true } └─StreamSimpleAgg { aggs: [string_agg(t.y, ',':Varchar order_by(t.y ASC)), count(distinct t.x), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [t.y, ',':Varchar, t.x, t._row_id] } @@ -938,7 +938,7 @@ └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [string_agg, count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [string_agg(distinct t.y, ',':Varchar order_by(t.y ASC)), count(distinct t.x)] } + └─StreamProject { exprs: [string_agg(distinct t.y, ',':Varchar order_by(t.y ASC)), count(distinct t.x)], noop_update_hint: true } └─StreamSimpleAgg { aggs: [string_agg(distinct t.y, ',':Varchar order_by(t.y ASC)), count(distinct t.x), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [t.y, ',':Varchar, t.x, t._row_id] } @@ -1006,7 +1006,7 @@ └─LogicalScan { table: t, columns: [t.a, t.b] } stream_plan: |- StreamMaterialize { columns: [s1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum($expr1) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32)))] } + └─StreamProject { exprs: [sum(sum($expr1) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32)))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum($expr1) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32))), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32))] } @@ -1313,7 +1313,7 @@ stream_plan: |- StreamMaterialize { columns: [stddev_samp, stddev_pop], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamProject { exprs: [Case((sum0(count(t.v1)) <= 1:Int64), null:Decimal, Sqrt(((sum(sum($expr1))::Decimal - (($expr2 * $expr2) / $expr3)) / (sum0(count(t.v1)) - 1:Int64)::Decimal))) as $expr4, Sqrt(((sum(sum($expr1))::Decimal - (($expr2 * $expr2) / $expr3)) / $expr3)) as $expr5] } - └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1)), sum(sum(t.v1))::Decimal as $expr2, sum0(count(t.v1))::Decimal as $expr3] } + └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1)), sum(sum(t.v1))::Decimal as $expr2, sum0(count(t.v1))::Decimal as $expr3], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum(t.v1), count(t.v1)] } @@ -1370,7 +1370,7 @@ └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [min, sum], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [min(min(t.v1)), sum(sum(t.v2))] } + └─StreamProject { exprs: [min(min(t.v1)), sum(sum(t.v2))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [min(min(t.v1)), sum(sum(t.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [min(t.v1), sum(t.v2), count] } @@ -1388,7 +1388,7 @@ └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [min, sum], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [min(t.v1), sum(t.v2)] } + └─StreamProject { exprs: [min(t.v1), sum(t.v2)], noop_update_hint: true } └─StreamSimpleAgg { aggs: [min(t.v1), sum(t.v2), count] } └─StreamExchange { dist: Single } └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -1677,7 +1677,7 @@ └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [first_value(t.x order_by(t.y ASC))] } + └─StreamProject { exprs: [first_value(t.x order_by(t.y ASC))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC)), count] } └─StreamExchange { dist: Single } └─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -1685,7 +1685,7 @@ Fragment 0 StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [first_value(t.x order_by(t.y ASC))] } + └── StreamProject { exprs: [first_value(t.x order_by(t.y ASC))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 @@ -1717,7 +1717,7 @@ Fragment 0 StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [first_value(distinct t.x order_by(t.x ASC))] } + └── StreamProject { exprs: [first_value(distinct t.x order_by(t.x ASC))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [first_value(distinct t.x order_by(t.x ASC)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0, SimpleAggDedupForCol0: 2 ] └── StreamExchange Single from 1 @@ -1753,7 +1753,7 @@ └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [last_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [last_value(t.x order_by(t.y DESC NULLS LAST))] } + └─StreamProject { exprs: [last_value(t.x order_by(t.y DESC NULLS LAST))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [last_value(t.x order_by(t.y DESC NULLS LAST)), count] } └─StreamExchange { dist: Single } └─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -1874,7 +1874,7 @@ └─LogicalScan { table: t, columns: [t.v1, t._row_id] } stream_plan: |- StreamMaterialize { columns: [x, y, z, w], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1)), sum0(count(t.v1)), sum(sum(t.v1)), sum0(count(t.v1))] } + └─StreamProject { exprs: [sum(sum(t.v1)), sum0(count(t.v1)), sum(sum(t.v1)), sum0(count(t.v1))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count(t.v1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v1), count(t.v1)] } @@ -1895,11 +1895,12 @@ └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } - └─StreamExchange { dist: Single } - └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } - └─StreamProject { exprs: [t.v1::Float64 as $expr1, t._row_id] } - └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProject { exprs: [approx_percentile], noop_update_hint: true } + └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } + └─StreamExchange { dist: Single } + └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } + └─StreamProject { exprs: [t.v1::Float64 as $expr1, t._row_id] } + └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: test simple approx_percentile with other simple aggs sql: | CREATE TABLE t (v1 int); @@ -1916,19 +1917,20 @@ └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [approx_percentile, sum], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamRowMerge { output: [approx_percentile:Float64, sum(sum(t.v1)):Int64] } - ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ └─StreamExchange { dist: Single } - │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ └─StreamShare { id: 2 } - │ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] } - │ └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count] } - └─StreamExchange { dist: Single } - └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] } - └─StreamShare { id: 2 } - └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] } - └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProject { exprs: [approx_percentile, sum(sum(t.v1))], noop_update_hint: true } + └─StreamRowMerge { output: [approx_percentile:Float64, sum(sum(t.v1)):Int64] } + ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ └─StreamExchange { dist: Single } + │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ └─StreamShare { id: 2 } + │ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] } + │ └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] } + └─StreamShare { id: 2 } + └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] } + └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: test simple approx_percentile with other simple aggs (sum, count) sql: | CREATE TABLE t (v1 int); @@ -1946,7 +1948,7 @@ └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, approx_percentile, s2, count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum(sum(t.v1)), sum0(count(t.v1))] } + └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum(sum(t.v1)), sum0(count(t.v1))], noop_update_hint: true } └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count(t.v1)):Int64] } ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } │ └─StreamExchange { dist: Single } @@ -1971,7 +1973,7 @@ └─LogicalScan { table: t, columns: [t.v1, t._row_id] } stream_plan: |- StreamMaterialize { columns: [x, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [approx_percentile, approx_percentile] } + └─StreamProject { exprs: [approx_percentile, approx_percentile], noop_update_hint: true } └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } └─StreamExchange { dist: Single } └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } @@ -1993,19 +1995,20 @@ └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [x, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] } - ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ └─StreamExchange { dist: Single } - │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ └─StreamShare { id: 2 } - │ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] } - │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } - └─StreamExchange { dist: Single } - └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 } - └─StreamShare { id: 2 } - └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] } - └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProject { exprs: [approx_percentile, approx_percentile], noop_update_hint: true } + └─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] } + ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ └─StreamExchange { dist: Single } + │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ └─StreamShare { id: 2 } + │ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] } + │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } + └─StreamExchange { dist: Single } + └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 } + └─StreamShare { id: 2 } + └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v2::Float64 as $expr2, t._row_id] } + └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: test simple approx_percentile with different approx_percentile interleaved with stateless simple aggs sql: | CREATE TABLE t (v1 int, v2 int); @@ -2023,7 +2026,7 @@ └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, x, count, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3] } + └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3], noop_update_hint: true } └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, sum(sum(t.v2)):Int64, approx_percentile:Float64] } ├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] } │ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } @@ -2061,7 +2064,7 @@ └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, x, count, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3] } + └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), (sum(sum(t.v2))::Float64 + approx_percentile) as $expr3], noop_update_hint: true } └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, sum(sum(t.v2)):Int64, approx_percentile:Float64] } ├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] } │ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } @@ -2098,19 +2101,20 @@ └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64] } - ├─StreamGlobalApproxPercentile { quantile: 0.8:Float64, relative_error: 0.01:Float64 } - │ └─StreamExchange { dist: Single } - │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.8:Float64, relative_error: 0.01:Float64 } - │ └─StreamShare { id: 2 } - │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t._row_id] } - │ └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count] } - └─StreamExchange { dist: Single } - └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] } - └─StreamShare { id: 2 } - └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t._row_id] } - └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile], noop_update_hint: true } + └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64] } + ├─StreamGlobalApproxPercentile { quantile: 0.8:Float64, relative_error: 0.01:Float64 } + │ └─StreamExchange { dist: Single } + │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.8:Float64, relative_error: 0.01:Float64 } + │ └─StreamShare { id: 2 } + │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t._row_id] } + │ └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count] } + └─StreamExchange { dist: Single } + └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] } + └─StreamShare { id: 2 } + └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t._row_id] } + └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: test simple approx_percentile with different approx_percentile interleaved with stateless + stateful simple aggs sql: | CREATE TABLE t (v1 int, v2 int); @@ -2127,25 +2131,26 @@ └─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [s1, x, count, m2, y], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, max(max(t.v2)):Int32, approx_percentile:Float64] } - ├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] } - │ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ │ └─StreamExchange { dist: Single } - │ │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ │ └─StreamShare { id: 2 } - │ │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] } - │ │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - │ └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ └─StreamExchange { dist: Single } - │ └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 } - │ └─StreamShare { id: 2 } - │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] } - │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), max(max(t.v2)), count] } - └─StreamExchange { dist: Single } - └─StreamHashAgg { group_key: [$expr5], aggs: [sum(t.v1), count, max(t.v2)] } - └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr3, t.v2, t.v2::Float64 as $expr4, t._row_id, Vnode(t._row_id) as $expr5] } - └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum0(count), max(max(t.v2)), approx_percentile], noop_update_hint: true } + └─StreamRowMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count):Int64, max(max(t.v2)):Int32, approx_percentile:Float64] } + ├─StreamRowMerge { output: [approx_percentile:Float64, approx_percentile:Float64] } + │ ├─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ │ └─StreamExchange { dist: Single } + │ │ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ │ └─StreamShare { id: 2 } + │ │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] } + │ │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + │ └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ └─StreamExchange { dist: Single } + │ └─StreamLocalApproxPercentile { percentile_col: $expr2, quantile: 0.5:Float64, relative_error: 0.01:Float64 } + │ └─StreamShare { id: 2 } + │ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t.v2, t.v2::Float64 as $expr2, t._row_id] } + │ └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count), max(max(t.v2)), count] } + └─StreamExchange { dist: Single } + └─StreamHashAgg { group_key: [$expr5], aggs: [sum(t.v1), count, max(t.v2)] } + └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr3, t.v2, t.v2::Float64 as $expr4, t._row_id, Vnode(t._row_id) as $expr5] } + └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: test hash approx_percentile sql: | CREATE TABLE t (v1 int, v2 int); @@ -2193,8 +2198,9 @@ └─BatchScan { table: t, columns: [t.v1], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } - └─StreamExchange { dist: Single } - └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } - └─StreamProject { exprs: [t.v1::Float64 as $expr1, t._row_id] } - └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamProject { exprs: [approx_percentile], noop_update_hint: true } + └─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 } + └─StreamExchange { dist: Single } + └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 } + └─StreamProject { exprs: [t.v1::Float64 as $expr1, t._row_id] } + └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/output/append_only.yaml b/src/frontend/planner_test/tests/testdata/output/append_only.yaml index d0701675c361..e76813e05f75 100644 --- a/src/frontend/planner_test/tests/testdata/output/append_only.yaml +++ b/src/frontend/planner_test/tests/testdata/output/append_only.yaml @@ -33,7 +33,7 @@ select max(v1) as max_v1 from t1; stream_plan: |- StreamMaterialize { columns: [max_v1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(max(t1.v1))] } + └─StreamProject { exprs: [max(max(t1.v1))], noop_update_hint: true } └─StreamSimpleAgg [append_only] { aggs: [max(max(t1.v1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [max(t1.v1)] } diff --git a/src/frontend/planner_test/tests/testdata/output/bushy_join.yaml b/src/frontend/planner_test/tests/testdata/output/bushy_join.yaml index 9d042f1e60c8..a785ac443901 100644 --- a/src/frontend/planner_test/tests/testdata/output/bushy_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/bushy_join.yaml @@ -8,7 +8,7 @@ sql: select count(*) from t t1 join t t2 on t1.id = t2.id join t t3 on t1.id = t3.id join t t4 on t1.id = t4.id join t t5 on t1.id = t5.id join t t6 on t1.id = t6.id join t t7 on t1.id = t7.id join t t8 on t1.id = t8.id; stream_plan: |- StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count)] } + └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } diff --git a/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml b/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml index ce98b8bea75c..3b96806aabc7 100644 --- a/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml @@ -869,7 +869,7 @@ └─BatchScan { table: order_line, columns: [order_line.ol_amount, order_line.ol_delivery_d, order_line.ol_quantity], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(order_line.ol_amount))] } + └─StreamProject { exprs: [sum(sum(order_line.ol_amount))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(order_line.ol_amount)] } @@ -880,7 +880,7 @@ Fragment 0 StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum(sum(order_line.ol_amount))] } + └── StreamProject { exprs: [sum(sum(order_line.ol_amount))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -1940,7 +1940,7 @@ │ └─StreamProject { exprs: [stock.s_i_id, stock.s_order_cnt, ((stock.s_w_id * stock.s_i_id) % 10000:Int32)::Int64 as $expr1, stock.s_w_id] } │ └─StreamTableScan { table: stock, columns: [stock.s_i_id, stock.s_w_id, stock.s_order_cnt], stream_scan_type: ArrangementBackfill, stream_key: [stock.s_w_id, stock.s_i_id], pk: [s_w_id, s_i_id], dist: UpstreamHashShard(stock.s_w_id, stock.s_i_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum(sum(stock.s_order_cnt))::Decimal * 0.005:Decimal) as $expr3] } + └─StreamProject { exprs: [(sum(sum(stock.s_order_cnt))::Decimal * 0.005:Decimal) as $expr3], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum(stock.s_order_cnt)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(stock.s_order_cnt)] } @@ -2008,7 +2008,7 @@ └── BatchPlanNode Fragment 7 - StreamProject { exprs: [(sum(sum(stock.s_order_cnt))::Decimal * 0.005:Decimal) as $expr3] } + StreamProject { exprs: [(sum(sum(stock.s_order_cnt))::Decimal * 0.005:Decimal) as $expr3], noop_update_hint: true } └── StreamSimpleAgg { aggs: [sum(sum(stock.s_order_cnt)), count] } { tables: [ SimpleAggState: 14 ] } └── StreamExchange Single from 8 @@ -2265,7 +2265,7 @@ └─BatchScan { table: order_line, columns: [order_line.ol_i_id, order_line.ol_amount, order_line.ol_delivery_d], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / (1:Decimal + sum(sum(order_line.ol_amount)))) as $expr2] } + └─StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / (1:Decimal + sum(sum(order_line.ol_amount)))) as $expr2], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(order_line.ol_amount)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum(order_line.ol_amount)] } @@ -2279,11 +2279,9 @@ └─StreamTableScan { table: item, columns: [item.i_id, item.i_data], stream_scan_type: ArrangementBackfill, stream_key: [item.i_id], pk: [i_id], dist: UpstreamHashShard(item.i_id) } stream_dist_plan: |+ Fragment 0 - StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / (1:Decimal + sum(sum(order_line.ol_amount)))) as $expr2] } - └── StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(order_line.ol_amount)), count] } - ├── tables: [ SimpleAggState: 0 ] + StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } + └── StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / (1:Decimal + sum(sum(order_line.ol_amount)))) as $expr2], noop_update_hint: true } + └── StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(order_line.ol_amount)), count] } { tables: [ SimpleAggState: 0 ] } └── StreamExchange Single from 1 Fragment 1 @@ -2358,7 +2356,7 @@ │ └─StreamProject { exprs: [revenue1.total_revenue, revenue1.supplier_no::Int64 as $expr1, revenue1.supplier_no] } │ └─StreamTableScan { table: revenue1, columns: [revenue1.supplier_no, revenue1.total_revenue], stream_scan_type: ArrangementBackfill, stream_key: [revenue1.supplier_no], pk: [supplier_no], dist: UpstreamHashShard(revenue1.supplier_no) } └─StreamExchange { dist: HashShard(max(max(revenue1.total_revenue))) } - └─StreamProject { exprs: [max(max(revenue1.total_revenue))] } + └─StreamProject { exprs: [max(max(revenue1.total_revenue))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [max(max(revenue1.total_revenue)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(revenue1.total_revenue), count] } @@ -2396,7 +2394,7 @@ └── BatchPlanNode Fragment 5 - StreamProject { exprs: [max(max(revenue1.total_revenue))] } + StreamProject { exprs: [max(max(revenue1.total_revenue))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [max(max(revenue1.total_revenue)), count] } { tables: [ SimpleAggState: 11, SimpleAggCall0: 10 ] } └── StreamExchange Single from 6 @@ -2628,7 +2626,7 @@ └─BatchScan { table: order_line, columns: [order_line.ol_i_id, order_line.ol_quantity], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [avg_yearly], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [(sum(sum(order_line.ol_amount)) / 2.0:Decimal) as $expr3] } + └─StreamProject { exprs: [(sum(sum(order_line.ol_amount)) / 2.0:Decimal) as $expr3], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(order_line.ol_amount)] } @@ -2651,9 +2649,8 @@ Fragment 0 StreamMaterialize { columns: [avg_yearly], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [(sum(sum(order_line.ol_amount)) / 2.0:Decimal) as $expr3] } - └── StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } - ├── tables: [ SimpleAggState: 0 ] + └── StreamProject { exprs: [(sum(sum(order_line.ol_amount)) / 2.0:Decimal) as $expr3], noop_update_hint: true } + └── StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } { tables: [ SimpleAggState: 0 ] } └── StreamExchange Single from 1 Fragment 1 @@ -2861,7 +2858,7 @@ └─BatchScan { table: order_line, columns: [order_line.ol_w_id, order_line.ol_i_id, order_line.ol_amount, order_line.ol_quantity], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(order_line.ol_amount))] } + └─StreamProject { exprs: [sum(sum(order_line.ol_amount))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(order_line.ol_amount)] } @@ -2880,7 +2877,7 @@ Fragment 0 StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum(sum(order_line.ol_amount))] } + └── StreamProject { exprs: [sum(sum(order_line.ol_amount))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -3410,7 +3407,7 @@ │ └─StreamProject { exprs: [orders.o_c_id, orders.o_w_id, orders.o_d_id, orders.o_id] } │ └─StreamTableScan { table: orders, columns: [orders.o_d_id, orders.o_w_id, orders.o_c_id, orders.o_id], stream_scan_type: ArrangementBackfill, stream_key: [orders.o_w_id, orders.o_d_id, orders.o_id], pk: [o_w_id, o_d_id, o_id], dist: UpstreamHashShard(orders.o_w_id, orders.o_d_id, orders.o_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum(sum(customer.c_balance)) / sum0(count(customer.c_balance))::Decimal) as $expr1] } + └─StreamProject { exprs: [(sum(sum(customer.c_balance)) / sum0(count(customer.c_balance))::Decimal) as $expr1], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum(customer.c_balance)), sum0(count(customer.c_balance)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(customer.c_balance), count(customer.c_balance)] } diff --git a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml index abbc0aae184e..ce97db3c0d33 100644 --- a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml @@ -74,7 +74,7 @@ └─StreamProject { exprs: [Sqrt($expr5) as $expr6, Case((sum0(count(t.v)) <= 1:Int64), null:Decimal, Sqrt(($expr4 / (sum0(count(t.v)) - 1:Int64)::Decimal))) as $expr7, $expr5, Case((sum0(count(t.v)) <= 1:Int64), null:Decimal, ($expr4 / (sum0(count(t.v)) - 1:Int64)::Decimal)) as $expr8] } └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), ($expr4 / $expr3) as $expr5, $expr4] } └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), (sum(sum($expr1))::Decimal - (($expr2 * $expr2) / $expr3)) as $expr4, $expr3] } - └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), sum(sum(t.v))::Decimal as $expr2, sum0(count(t.v))::Decimal as $expr3] } + └─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), sum(sum(t.v))::Decimal as $expr2, sum0(count(t.v))::Decimal as $expr3], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum(t.v), count(t.v)] } diff --git a/src/frontend/planner_test/tests/testdata/output/dynamic_filter.yaml b/src/frontend/planner_test/tests/testdata/output/dynamic_filter.yaml index 89aea24f2bd8..922723851944 100644 --- a/src/frontend/planner_test/tests/testdata/output/dynamic_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/output/dynamic_filter.yaml @@ -18,7 +18,7 @@ └─StreamDynamicFilter { predicate: (t1.v1 > max(max(t2.v2))), output: [t1.v1, t1._row_id] } ├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [max(max(t2.v2))] } + └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } @@ -77,7 +77,7 @@ ├─StreamProject { exprs: [t1.v1, (t1.v1 + t1.v1) as $expr1, t1._row_id] } │ └─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [max(max(t2.v2))] } + └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } @@ -129,7 +129,7 @@ ├─StreamExchange { dist: HashShard(t1.v1) } │ └─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: HashShard(max(max(t2.v2))) } - └─StreamProject { exprs: [max(max(t2.v2))] } + └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } @@ -153,7 +153,7 @@ ├─StreamProject { exprs: [t1.v1, t1.v1::Int64 as $expr1, t1._row_id] } │ └─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [max(max(t2.v2))] } + └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } @@ -169,7 +169,7 @@ └─StreamDynamicFilter { predicate: (t1.v1 > max(max(t2.v2))), output: [t1.v1, t1._row_id] } ├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [max(max(t2.v2))] } + └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } @@ -191,7 +191,7 @@ └─StreamDynamicFilter { predicate: (t1.v1 > $expr1), output: [t1.v1, t1._row_id] } ├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(2:Int32 * max(max(t2.v2))) as $expr1] } + └─StreamProject { exprs: [(2:Int32 * max(max(t2.v2))) as $expr1], noop_update_hint: true } └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } @@ -220,7 +220,7 @@ │ ├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } │ └─StreamExchange { dist: Broadcast } │ └─StreamShare { id: 6 } - │ └─StreamProject { exprs: [max(max(t2.v2))] } + │ └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true } │ └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } │ └─StreamExchange { dist: Single } │ └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } @@ -229,7 +229,7 @@ └─StreamExchange { dist: Broadcast } └─StreamProject { exprs: [(max(max(t2.v2)) + 5:Int32) as $expr1] } └─StreamShare { id: 6 } - └─StreamProject { exprs: [max(max(t2.v2))] } + └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] } diff --git a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml index 815890d6a73b..e7a1951ffde5 100644 --- a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml @@ -180,7 +180,7 @@ where path_val = t1.id; stream_plan: |- StreamMaterialize { columns: [array_agg], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [array_agg(t1.n order_by($expr1 ASC))] } + └─StreamProject { exprs: [array_agg(t1.n order_by($expr1 ASC))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [array_agg(t1.n order_by($expr1 ASC)), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [t1.n, (projected_row_id + 1:Int64) as $expr1, t1._row_id, t2.p, t2.p, t2.d, t2.d, projected_row_id, t1.id, t2._row_id] } diff --git a/src/frontend/planner_test/tests/testdata/output/limit.yaml b/src/frontend/planner_test/tests/testdata/output/limit.yaml index 22fb2add9d30..500dbe1dd582 100644 --- a/src/frontend/planner_test/tests/testdata/output/limit.yaml +++ b/src/frontend/planner_test/tests/testdata/output/limit.yaml @@ -131,7 +131,7 @@ stream_plan: |- StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamTopN { order: [sum0(count) ASC], limit: 1, offset: 0 } - └─StreamProject { exprs: [sum0(count)] } + └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } @@ -154,7 +154,7 @@ stream_plan: |- StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } └─StreamTopN { order: [sum0(count) ASC], limit: 1, offset: 0 } - └─StreamProject { exprs: [sum0(count)] } + └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } diff --git a/src/frontend/planner_test/tests/testdata/output/mv_column_name.yaml b/src/frontend/planner_test/tests/testdata/output/mv_column_name.yaml index 3db403433631..91352992bb17 100644 --- a/src/frontend/planner_test/tests/testdata/output/mv_column_name.yaml +++ b/src/frontend/planner_test/tests/testdata/output/mv_column_name.yaml @@ -63,7 +63,7 @@ select count(*), max(a) from t; stream_plan: |- StreamMaterialize { columns: [count, max], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count), max(max(t.a))] } + └─StreamProject { exprs: [sum0(count), max(max(t.a))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum0(count), max(max(t.a)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [count, max(t.a)] } diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index d6b90da0a8c1..1ea7349b2476 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -1891,7 +1891,7 @@ │ └─StreamExchange { dist: HashShard(bid.auction) } │ └─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1] } + └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(bid.auction)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum0(count), count(bid.auction)] } @@ -1926,7 +1926,7 @@ └── BatchPlanNode Fragment 3 - StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1] } + StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1], noop_update_hint: true } └── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(bid.auction)), count] } { tables: [ SimpleAggState: 9 ] } └── StreamExchange Single from 4 @@ -2331,7 +2331,7 @@ └─BatchScan { table: bid, columns: [bid.auction, bid.price, bid.date_time], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [min(min(max(bid.price)))] } + └─StreamProject { exprs: [min(min(max(bid.price)))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [min(min(max(bid.price))), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [$expr1], aggs: [min(max(bid.price)), count] } @@ -2348,7 +2348,7 @@ Fragment 0 StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [min(min(max(bid.price)))] } + └── StreamProject { exprs: [min(min(max(bid.price)))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [min(min(max(bid.price))), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml index 35713c9682a3..15e1647721d5 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml @@ -1878,7 +1878,7 @@ │ └─StreamRowIdGen { row_id_index: 7 } │ └─StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(auction))) as $expr1] } + └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(auction))) as $expr1], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(auction)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum0(count), count(auction)] } @@ -1915,7 +1915,7 @@ └── StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } { tables: [ Source: 8 ] } Fragment 4 - StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(auction))) as $expr1] } + StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(auction))) as $expr1], noop_update_hint: true } └── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(auction)), count] } { tables: [ SimpleAggState: 9 ] } └── StreamExchange Single from 5 @@ -2277,7 +2277,7 @@ └─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } stream_plan: |- StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [min(min(max(price)))] } + └─StreamProject { exprs: [min(min(max(price)))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [min(min(max(price))), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [$expr1], aggs: [min(max(price)), count] } @@ -2296,7 +2296,7 @@ Fragment 0 StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [min(min(max(price)))] } + └── StreamProject { exprs: [min(min(max(price)))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [min(min(max(price))), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark_temporal_filter.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark_temporal_filter.yaml index d5d948e5b507..0658030573dd 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark_temporal_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark_temporal_filter.yaml @@ -1517,7 +1517,7 @@ │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '00:05:00':Interval, 'UTC':Varchar) as $expr4], output_watermarks: [$expr4] } │ └─StreamNow { output: [now] } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr5))) as $expr6] } + └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr5))) as $expr6], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count($expr5)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum0(count), count($expr5)] } @@ -1578,7 +1578,7 @@ └── StreamNow { output: [now] } { tables: [ Now: 10 ] } Fragment 6 - StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr5))) as $expr6] } + StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr5))) as $expr6], noop_update_hint: true } └── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count($expr5)), count] } { tables: [ SimpleAggState: 11 ] } └── StreamExchange Single from 7 @@ -2000,7 +2000,7 @@ ) stream_plan: |- StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [min(min(max($expr7)))] } + └─StreamProject { exprs: [min(min(max($expr7)))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [min(min(max($expr7))), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [$expr9], aggs: [min(max($expr7)), count] } @@ -2035,7 +2035,7 @@ Fragment 0 StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [min(min(max($expr7)))] } + └── StreamProject { exprs: [min(min(max($expr7)))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [min(min(max($expr7))), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml index f065ba33c252..c577b72eaafd 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml @@ -2059,7 +2059,7 @@ │ └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] } │ └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr4))) as $expr5] } + └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr4))) as $expr5], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count($expr4)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum0(count), count($expr4)] } @@ -2111,7 +2111,7 @@ └── StreamExchange NoShuffle from 2 Fragment 5 - StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr4))) as $expr5] } + StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr4))) as $expr5], noop_update_hint: true } └── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count($expr4)), count] } { tables: [ SimpleAggState: 9 ] } └── StreamExchange Single from 6 @@ -2533,7 +2533,7 @@ └─BatchSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] } stream_plan: |- StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [min(min(max($expr5)))] } + └─StreamProject { exprs: [min(min(max($expr5)))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [min(min(max($expr5))), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [$expr6], aggs: [min(max($expr5)), count] } @@ -2564,7 +2564,7 @@ Fragment 0 StreamMaterialize { columns: [min_final], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [min(min(max($expr5)))] } + └── StreamProject { exprs: [min(min(max($expr5)))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [min(min(max($expr5))), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 diff --git a/src/frontend/planner_test/tests/testdata/output/share.yaml b/src/frontend/planner_test/tests/testdata/output/share.yaml index 2cf3aee9fe04..7962e4724f34 100644 --- a/src/frontend/planner_test/tests/testdata/output/share.yaml +++ b/src/frontend/planner_test/tests/testdata/output/share.yaml @@ -33,7 +33,7 @@ └─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id] } stream_plan: |- StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count)] } + └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } └─StreamSimpleAgg [append_only] { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } @@ -155,7 +155,7 @@ ├─StreamExchange { dist: HashShard(0:Int32) } │ └─StreamProject { exprs: [sum0(count), 0:Int32] } │ └─StreamShare { id: 5 } - │ └─StreamProject { exprs: [sum0(count)] } + │ └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } │ └─StreamSimpleAgg { aggs: [sum0(count), count] } │ └─StreamExchange { dist: Single } │ └─StreamStatelessSimpleAgg { aggs: [count] } @@ -163,7 +163,7 @@ └─StreamExchange { dist: HashShard(1:Int32) } └─StreamProject { exprs: [sum0(count), 1:Int32] } └─StreamShare { id: 5 } - └─StreamProject { exprs: [sum0(count)] } + └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } @@ -176,13 +176,13 @@ StreamMaterialize { columns: [count, $src(hidden)], stream_key: [$src], pk_columns: [$src], pk_conflict: NoCheck } └─StreamUnion { all: true } ├─StreamExchange { dist: HashShard(0:Int32) } - │ └─StreamProject { exprs: [sum0(count), 0:Int32] } + │ └─StreamProject { exprs: [sum0(count), 0:Int32], noop_update_hint: true } │ └─StreamSimpleAgg { aggs: [sum0(count), count] } │ └─StreamExchange { dist: Single } │ └─StreamStatelessSimpleAgg { aggs: [count] } │ └─StreamTableScan { table: t, columns: [t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } └─StreamExchange { dist: HashShard(1:Int32) } - └─StreamProject { exprs: [sum0(count), 1:Int32] } + └─StreamProject { exprs: [sum0(count), 1:Int32], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } @@ -195,7 +195,7 @@ select count(*) cnt from auction A join auction B on A.id = B.id; stream_plan: |- StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count)] } + └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } └─StreamSimpleAgg [append_only] { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } @@ -216,7 +216,7 @@ with cte as (select a, sum(b) sum from t group by a) select count(*) from cte c1 join cte c2 on c1.a = c2.a; stream_plan: |- StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count)] } + └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } @@ -235,7 +235,7 @@ Fragment 0 StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum0(count)] } + └── StreamProject { exprs: [sum0(count)], noop_update_hint: true } └── StreamSimpleAgg { aggs: [sum0(count), count] } { tables: [ SimpleAggState: 0 ] } └── StreamExchange Single from 1 diff --git a/src/frontend/planner_test/tests/testdata/output/stream_dist_agg.yaml b/src/frontend/planner_test/tests/testdata/output/stream_dist_agg.yaml index 48caec86bd94..0b7d7d7f2f2b 100644 --- a/src/frontend/planner_test/tests/testdata/output/stream_dist_agg.yaml +++ b/src/frontend/planner_test/tests/testdata/output/stream_dist_agg.yaml @@ -17,13 +17,13 @@ └─BatchScan { table: s, columns: [s.v], distribution: Single } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(s.v)] } + └─StreamProject { exprs: [max(s.v)], noop_update_hint: true } └─StreamSimpleAgg { aggs: [max(s.v), count] } └─StreamTableScan { table: s, columns: [s.v, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } - └── StreamProject { exprs: [max(s.v)] } + └── StreamProject { exprs: [max(s.v)], noop_update_hint: true } └── StreamSimpleAgg { aggs: [max(s.v), count] } { tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] } └── StreamTableScan { table: s, columns: [s.v, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } ├── tables: [ StreamScan: 2 ] @@ -55,13 +55,13 @@ └─BatchScan { table: s, columns: [s.v], distribution: Single } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(s.v)] } + └─StreamProject { exprs: [sum(s.v)], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(s.v), count] } └─StreamTableScan { table: s, columns: [s.v, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } - └── StreamProject { exprs: [sum(s.v)] } + └── StreamProject { exprs: [sum(s.v)], noop_update_hint: true } └── StreamSimpleAgg { aggs: [sum(s.v), count] } { tables: [ SimpleAggState: 0 ] } └── StreamTableScan { table: s, columns: [s.v, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } ├── tables: [ StreamScan: 1 ] @@ -91,13 +91,13 @@ └─BatchScan { table: s, columns: [s.v], distribution: Single } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [count(s.v)] } + └─StreamProject { exprs: [count(s.v)], noop_update_hint: true } └─StreamSimpleAgg { aggs: [count(s.v), count] } └─StreamTableScan { table: s, columns: [s.v, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } - └── StreamProject { exprs: [count(s.v)] } + └── StreamProject { exprs: [count(s.v)], noop_update_hint: true } └── StreamSimpleAgg { aggs: [count(s.v), count] } { tables: [ SimpleAggState: 0 ] } └── StreamTableScan { table: s, columns: [s.v, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } ├── tables: [ StreamScan: 1 ] @@ -128,14 +128,14 @@ └─BatchScan { table: s, columns: [s.v, s.s], distribution: Single } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [string_agg(s.s, ',':Varchar order_by(s.v ASC))] } + └─StreamProject { exprs: [string_agg(s.s, ',':Varchar order_by(s.v ASC))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [string_agg(s.s, ',':Varchar order_by(s.v ASC)), count] } └─StreamProject { exprs: [s.s, ',':Varchar, s.v, s.t._row_id] } └─StreamTableScan { table: s, columns: [s.v, s.s, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } - └── StreamProject { exprs: [string_agg(s.s, ',':Varchar order_by(s.v ASC))] } + └── StreamProject { exprs: [string_agg(s.s, ',':Varchar order_by(s.v ASC))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [string_agg(s.s, ',':Varchar order_by(s.v ASC)), count] } { tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] } └── StreamProject { exprs: [s.s, ',':Varchar, s.v, s.t._row_id] } └── StreamTableScan { table: s, columns: [s.v, s.s, s.o, s.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [s.t._row_id], pk: [o, t._row_id], dist: Single } @@ -169,7 +169,7 @@ └─BatchScan { table: t, columns: [t.v], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(max(t.v))] } + └─StreamProject { exprs: [max(max(t.v))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [max(max(t.v)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t.v), count] } @@ -179,7 +179,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [max(max(t.v))] } + └── StreamProject { exprs: [max(max(t.v))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [max(max(t.v)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 @@ -223,7 +223,7 @@ select max(v) as a1 from AO; stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(max(ao.v))] } + └─StreamProject { exprs: [max(max(ao.v))], noop_update_hint: true } └─StreamSimpleAgg [append_only] { aggs: [max(max(ao.v)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [max(ao.v)] } @@ -232,7 +232,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [max(max(ao.v))] } + └── StreamProject { exprs: [max(max(ao.v))], noop_update_hint: true } └── StreamSimpleAgg [append_only] { aggs: [max(max(ao.v)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -268,7 +268,7 @@ └─BatchScan { table: t, columns: [t.v], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(t.v))] } + └─StreamProject { exprs: [sum(sum(t.v))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum(t.v)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(t.v)] } @@ -277,7 +277,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum(sum(t.v))] } + └── StreamProject { exprs: [sum(sum(t.v))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [sum(sum(t.v)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -308,7 +308,7 @@ select sum(v) as a1 from AO; stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum(ao.v))] } + └─StreamProject { exprs: [sum(sum(ao.v))], noop_update_hint: true } └─StreamSimpleAgg [append_only] { aggs: [sum(sum(ao.v)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(ao.v)] } @@ -317,7 +317,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum(sum(ao.v))] } + └── StreamProject { exprs: [sum(sum(ao.v))], noop_update_hint: true } └── StreamSimpleAgg [append_only] { aggs: [sum(sum(ao.v)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -353,7 +353,7 @@ └─BatchScan { table: t, columns: [t.v], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count(t.v))] } + └─StreamProject { exprs: [sum0(count(t.v))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum0(count(t.v)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count(t.v)] } @@ -362,7 +362,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum0(count(t.v))] } + └── StreamProject { exprs: [sum0(count(t.v))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [sum0(count(t.v)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -393,7 +393,7 @@ select count(v) as a1 from AO; stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count(ao.v))] } + └─StreamProject { exprs: [sum0(count(ao.v))], noop_update_hint: true } └─StreamSimpleAgg [append_only] { aggs: [sum0(count(ao.v)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count(ao.v)] } @@ -402,7 +402,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum0(count(ao.v))] } + └── StreamProject { exprs: [sum0(count(ao.v))], noop_update_hint: true } └── StreamSimpleAgg [append_only] { aggs: [sum0(count(ao.v)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -438,7 +438,7 @@ └─BatchScan { table: t, columns: [t.o, t.s], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [string_agg(t.s, ',':Varchar order_by(t.o ASC))] } + └─StreamProject { exprs: [string_agg(t.s, ',':Varchar order_by(t.o ASC))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [string_agg(t.s, ',':Varchar order_by(t.o ASC)), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [t.s, ',':Varchar, t.o, t._row_id] } @@ -447,7 +447,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [string_agg(t.s, ',':Varchar order_by(t.o ASC))] } + └── StreamProject { exprs: [string_agg(t.s, ',':Varchar order_by(t.o ASC))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [string_agg(t.s, ',':Varchar order_by(t.o ASC)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 @@ -480,7 +480,7 @@ select string_agg(s, ',' order by o) as a1 from AO; stream_plan: |- StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } + └─StreamProject { exprs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC))], noop_update_hint: true } └─StreamSimpleAgg [append_only] { aggs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [ao.s, ',':Varchar, ao.o, ao._row_id] } @@ -489,7 +489,7 @@ Fragment 0 StreamMaterialize { columns: [a1], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } + └── StreamProject { exprs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC))], noop_update_hint: true } └── StreamSimpleAgg [append_only] { aggs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 @@ -527,7 +527,7 @@ └─BatchScan { table: t, columns: [t.v], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(max(t.v)), sum0(count(t.v))] } + └─StreamProject { exprs: [max(max(t.v)), sum0(count(t.v))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [max(max(t.v)), sum0(count(t.v)), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(t.v), count(t.v), count] } @@ -537,7 +537,7 @@ Fragment 0 StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [max(max(t.v)), sum0(count(t.v))] } + └── StreamProject { exprs: [max(max(t.v)), sum0(count(t.v))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [max(max(t.v)), sum0(count(t.v)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall0: 0 ] └── StreamExchange Single from 1 @@ -587,7 +587,7 @@ select max(v) as a1, count(v) as a2 from AO; stream_plan: |- StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(max(ao.v)), sum0(count(ao.v))] } + └─StreamProject { exprs: [max(max(ao.v)), sum0(count(ao.v))], noop_update_hint: true } └─StreamSimpleAgg [append_only] { aggs: [max(max(ao.v)), sum0(count(ao.v)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [max(ao.v), count(ao.v)] } @@ -596,7 +596,7 @@ Fragment 0 StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [max(max(ao.v)), sum0(count(ao.v))] } + └── StreamProject { exprs: [max(max(ao.v)), sum0(count(ao.v))], noop_update_hint: true } └── StreamSimpleAgg [append_only] { aggs: [max(max(ao.v)), sum0(count(ao.v)), count] } ├── tables: [ SimpleAggState: 0 ] └── StreamExchange Single from 1 @@ -632,7 +632,7 @@ └─BatchScan { table: t, columns: [t.v, t.o, t.s], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [count(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))] } + └─StreamProject { exprs: [count(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [count(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC)), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [t.v, t.s, ',':Varchar, t.o, t._row_id] } @@ -641,7 +641,7 @@ Fragment 0 StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [count(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))] } + └── StreamProject { exprs: [count(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [count(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall1: 0 ] └── StreamExchange Single from 1 @@ -679,7 +679,7 @@ select count(v) as a1, string_agg(s, ',' order by o) as a2 from AO; stream_plan: |- StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [count(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } + └─StreamProject { exprs: [count(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))], noop_update_hint: true } └─StreamSimpleAgg [append_only] { aggs: [count(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [ao.v, ao.s, ',':Varchar, ao.o, ao._row_id] } @@ -688,7 +688,7 @@ Fragment 0 StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [count(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } + └── StreamProject { exprs: [count(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))], noop_update_hint: true } └── StreamSimpleAgg [append_only] { aggs: [count(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall1: 0 ] └── StreamExchange Single from 1 @@ -726,7 +726,7 @@ └─BatchScan { table: t, columns: [t.v, t.o, t.s], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))] } + └─StreamProject { exprs: [max(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [max(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC)), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [t.v, t.s, ',':Varchar, t.o, t._row_id] } @@ -735,7 +735,7 @@ Fragment 0 StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [max(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))] } + └── StreamProject { exprs: [max(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [max(t.v), string_agg(t.s, ',':Varchar order_by(t.o ASC)), count] } ├── tables: [ SimpleAggState: 2, SimpleAggCall0: 0, SimpleAggCall1: 1 ] └── StreamExchange Single from 1 @@ -770,7 +770,7 @@ select max(v) as a1, string_agg(s, ',' order by o) as a2 from AO; stream_plan: |- StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [max(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } + └─StreamProject { exprs: [max(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))], noop_update_hint: true } └─StreamSimpleAgg [append_only] { aggs: [max(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } └─StreamExchange { dist: Single } └─StreamProject { exprs: [ao.v, ao.s, ',':Varchar, ao.o, ao._row_id] } @@ -779,7 +779,7 @@ Fragment 0 StreamMaterialize { columns: [a1, a2], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [max(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } + └── StreamProject { exprs: [max(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))], noop_update_hint: true } └── StreamSimpleAgg [append_only] { aggs: [max(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } ├── tables: [ SimpleAggState: 1, SimpleAggCall1: 0 ] └── StreamExchange Single from 1 diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml index 5cdfdf6cf45e..49d14526af64 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml @@ -61,7 +61,7 @@ select count(*) from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 where a2 < 10; stream_plan: |- StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum0(count)] } + └─StreamProject { exprs: [sum0(count)], noop_update_hint: true } └─StreamSimpleAgg [append_only] { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } diff --git a/src/frontend/planner_test/tests/testdata/output/tpch.yaml b/src/frontend/planner_test/tests/testdata/output/tpch.yaml index 3c43faa8d249..dbb7a5c08a62 100644 --- a/src/frontend/planner_test/tests/testdata/output/tpch.yaml +++ b/src/frontend/planner_test/tests/testdata/output/tpch.yaml @@ -1160,7 +1160,7 @@ └─BatchScan { table: lineitem, columns: [lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_quantity, lineitem.l_shipdate], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum($expr1))] } + └─StreamProject { exprs: [sum(sum($expr1))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum($expr1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1)] } @@ -1171,7 +1171,7 @@ Fragment 0 StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum(sum($expr1))] } + └── StreamProject { exprs: [sum(sum($expr1))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [sum(sum($expr1)), count] } { tables: [ SimpleAggState: 0 ] } └── StreamExchange Single from 1 @@ -2389,7 +2389,7 @@ │ └─StreamFilter { predicate: (nation.n_name = 'ARGENTINA':Varchar) } │ └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], stream_scan_type: ArrangementBackfill, stream_key: [nation.n_nationkey], pk: [n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum(sum($expr2)) * 0.0001000000:Decimal) as $expr3] } + └─StreamProject { exprs: [(sum(sum($expr2)) * 0.0001000000:Decimal) as $expr3], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum($expr2)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr2)] } @@ -2461,7 +2461,7 @@ └── BatchPlanNode Fragment 8 - StreamProject { exprs: [(sum(sum($expr2)) * 0.0001000000:Decimal) as $expr3] } + StreamProject { exprs: [(sum(sum($expr2)) * 0.0001000000:Decimal) as $expr3], noop_update_hint: true } └── StreamSimpleAgg { aggs: [sum(sum($expr2)), count] } { tables: [ SimpleAggState: 16 ] } └── StreamExchange Single from 9 @@ -2818,7 +2818,7 @@ └─BatchScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / sum(sum($expr2))) as $expr3] } + └─StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / sum(sum($expr2))) as $expr3], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum($expr2)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum($expr2)] } @@ -2834,9 +2834,8 @@ Fragment 0 StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / sum(sum($expr2))) as $expr3] } - └── StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum($expr2)), count] } - ├── tables: [ SimpleAggState: 0 ] + └── StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / sum(sum($expr2))) as $expr3], noop_update_hint: true } + └── StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum($expr2)), count] } { tables: [ SimpleAggState: 0 ] } └── StreamExchange Single from 1 Fragment 1 @@ -2966,7 +2965,7 @@ │ └─StreamFilter { predicate: (lineitem.l_shipdate >= '1993-01-01':Date) AND (lineitem.l_shipdate < '1993-04-01 00:00:00':Timestamp) } │ └─StreamTableScan { table: lineitem, columns: [lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_shipdate], stream_scan_type: ArrangementBackfill, stream_key: [lineitem.l_orderkey, lineitem.l_linenumber], pk: [l_orderkey, l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } └─StreamExchange { dist: HashShard(max(max(sum($expr1)))) } - └─StreamProject { exprs: [max(max(sum($expr1)))] } + └─StreamProject { exprs: [max(max(sum($expr1)))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [max(max(sum($expr1))), count] } └─StreamExchange { dist: Single } └─StreamHashAgg { group_key: [_vnode], aggs: [max(sum($expr1)), count] } @@ -3020,7 +3019,7 @@ └── BatchPlanNode Fragment 6 - StreamProject { exprs: [max(max(sum($expr1)))] } + StreamProject { exprs: [max(max(sum($expr1)))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [max(max(sum($expr1))), count] } { tables: [ SimpleAggState: 14, SimpleAggCall0: 13 ] } └── StreamExchange Single from 7 @@ -3296,7 +3295,7 @@ └─BatchScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_quantity], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [avg_yearly], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [(sum(sum(lineitem.l_extendedprice)) / 7.0:Decimal) as $expr2] } + └─StreamProject { exprs: [(sum(sum(lineitem.l_extendedprice)) / 7.0:Decimal) as $expr2], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum(lineitem.l_extendedprice)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(lineitem.l_extendedprice)] } @@ -3319,9 +3318,8 @@ Fragment 0 StreamMaterialize { columns: [avg_yearly], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [(sum(sum(lineitem.l_extendedprice)) / 7.0:Decimal) as $expr2] } - └── StreamSimpleAgg { aggs: [sum(sum(lineitem.l_extendedprice)), count] } - ├── tables: [ SimpleAggState: 0 ] + └── StreamProject { exprs: [(sum(sum(lineitem.l_extendedprice)) / 7.0:Decimal) as $expr2], noop_update_hint: true } + └── StreamSimpleAgg { aggs: [sum(sum(lineitem.l_extendedprice)), count] } { tables: [ SimpleAggState: 0 ] } └── StreamExchange Single from 1 Fragment 1 @@ -3672,7 +3670,7 @@ └─BatchScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipinstruct, lineitem.l_shipmode], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } - └─StreamProject { exprs: [sum(sum($expr1))] } + └─StreamProject { exprs: [sum(sum($expr1))], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum($expr1)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1)] } @@ -3690,7 +3688,7 @@ Fragment 0 StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [sum(sum($expr1))] } + └── StreamProject { exprs: [sum(sum($expr1))], noop_update_hint: true } └── StreamSimpleAgg { aggs: [sum(sum($expr1)), count] } { tables: [ SimpleAggState: 0 ] } └── StreamExchange Single from 1 @@ -4342,7 +4340,7 @@ │ └─StreamExchange { dist: HashShard(orders.o_custkey) } │ └─StreamTableScan { table: orders, columns: [orders.o_custkey, orders.o_orderkey], stream_scan_type: ArrangementBackfill, stream_key: [orders.o_orderkey], pk: [o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum(sum(customer.c_acctbal)) / sum0(count(customer.c_acctbal))::Decimal) as $expr1] } + └─StreamProject { exprs: [(sum(sum(customer.c_acctbal)) / sum0(count(customer.c_acctbal))::Decimal) as $expr1], noop_update_hint: true } └─StreamSimpleAgg { aggs: [sum(sum(customer.c_acctbal)), sum0(count(customer.c_acctbal)), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum(customer.c_acctbal), count(customer.c_acctbal)] } diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index cf7025be9087..9e774628fc26 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -1386,12 +1386,19 @@ impl ToStream for LogicalAgg { panic!("the root PlanNode must be StreamHashAgg, StreamSimpleAgg, StreamGlobalApproxPercentile, or StreamRowMerge"); }; - if self.agg_calls().len() == n_final_agg_calls { + let is_hash_agg = !self.group_key().is_empty(); + // "Simple Agg" includes normal simple agg, as well as approx percentile simple 2 phase agg. + let is_simple_agg = !is_hash_agg; + if self.agg_calls().len() == n_final_agg_calls && is_hash_agg { // an existing `count(*)` is used as row count column in `StreamXxxAgg` Ok(plan) } else { - // a `count(*)` is appended, should project the output - assert_eq!(self.agg_calls().len() + 1, n_final_agg_calls); + // For hash agg, a `count(*)` is appended, should project the output. + // For simple agg, we output every epoch, so we will always add a project + // to filter out no-op updates, and we don't need the following assert. + if is_hash_agg { + assert_eq!(self.agg_calls().len() + 1, n_final_agg_calls); + } Ok(StreamProject::new(generic::Project::with_out_col_idx( plan, 0..self.schema().len(), @@ -1400,7 +1407,9 @@ impl ToStream for LogicalAgg { // Since it'll be pruned immediately in `StreamProject`, the update records are likely to be // no-op. So we set the hint to instruct the executor to eliminate them. // See https://github.com/risingwavelabs/risingwave/issues/17030. - .with_noop_update_hint(self.agg_calls().is_empty()) + // Further for simple agg, we also have to set the hint to eliminate no-op updates. + // Since we will output every epoch. + .with_noop_update_hint(self.agg_calls().is_empty() || is_simple_agg) .into()) } }