Skip to content

Commit

Permalink
fix(frontend): add project after simple agg to avoid noop updates (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored and kwannoel committed Aug 28, 2024
1 parent 612955d commit c4875c2
Show file tree
Hide file tree
Showing 18 changed files with 220 additions and 210 deletions.
194 changes: 100 additions & 94 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
select max(v1) as max_v1 from t1;
stream_plan: |-
StreamMaterialize { columns: [max_v1], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [max(max(t1.v1))] }
└─StreamProject { exprs: [max(max(t1.v1))], noop_update_hint: true }
└─StreamSimpleAgg [append_only] { aggs: [max(max(t1.v1)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [max(t1.v1)] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
sql: select count(*) from t t1 join t t2 on t1.id = t2.id join t t3 on t1.id = t3.id join t t4 on t1.id = t4.id join t t5 on t1.id = t5.id join t t6 on t1.id = t6.id join t t7 on t1.id = t7.id join t t8 on t1.id = t8.id;
stream_plan: |-
StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum0(count)] }
└─StreamProject { exprs: [sum0(count)], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [sum0(count), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [count] }
Expand Down
35 changes: 16 additions & 19 deletions src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@
└─BatchScan { table: order_line, columns: [order_line.ol_amount, order_line.ol_delivery_d, order_line.ol_quantity], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum(sum(order_line.ol_amount))] }
└─StreamProject { exprs: [sum(sum(order_line.ol_amount))], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum(order_line.ol_amount)] }
Expand All @@ -880,7 +880,7 @@
Fragment 0
StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
├── tables: [ Materialize: 4294967294 ]
└── StreamProject { exprs: [sum(sum(order_line.ol_amount))] }
└── StreamProject { exprs: [sum(sum(order_line.ol_amount))], noop_update_hint: true }
└── StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] }
├── tables: [ SimpleAggState: 0 ]
└── StreamExchange Single from 1
Expand Down Expand Up @@ -1940,7 +1940,7 @@
│ └─StreamProject { exprs: [stock.s_i_id, stock.s_order_cnt, ((stock.s_w_id * stock.s_i_id) % 10000:Int32)::Int64 as $expr1, stock.s_w_id] }
│ └─StreamTableScan { table: stock, columns: [stock.s_i_id, stock.s_w_id, stock.s_order_cnt], stream_scan_type: ArrangementBackfill, stream_key: [stock.s_w_id, stock.s_i_id], pk: [s_w_id, s_i_id], dist: UpstreamHashShard(stock.s_w_id, stock.s_i_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [(sum(sum(stock.s_order_cnt))::Decimal * 0.005:Decimal) as $expr3] }
└─StreamProject { exprs: [(sum(sum(stock.s_order_cnt))::Decimal * 0.005:Decimal) as $expr3], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [sum(sum(stock.s_order_cnt)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum(stock.s_order_cnt)] }
Expand Down Expand Up @@ -2008,7 +2008,7 @@
└── BatchPlanNode
Fragment 7
StreamProject { exprs: [(sum(sum(stock.s_order_cnt))::Decimal * 0.005:Decimal) as $expr3] }
StreamProject { exprs: [(sum(sum(stock.s_order_cnt))::Decimal * 0.005:Decimal) as $expr3], noop_update_hint: true }
└── StreamSimpleAgg { aggs: [sum(sum(stock.s_order_cnt)), count] } { tables: [ SimpleAggState: 14 ] }
└── StreamExchange Single from 8
Expand Down Expand Up @@ -2265,7 +2265,7 @@
└─BatchScan { table: order_line, columns: [order_line.ol_i_id, order_line.ol_amount, order_line.ol_delivery_d], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / (1:Decimal + sum(sum(order_line.ol_amount)))) as $expr2] }
└─StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / (1:Decimal + sum(sum(order_line.ol_amount)))) as $expr2], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(order_line.ol_amount)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum(order_line.ol_amount)] }
Expand All @@ -2279,11 +2279,9 @@
└─StreamTableScan { table: item, columns: [item.i_id, item.i_data], stream_scan_type: ArrangementBackfill, stream_key: [item.i_id], pk: [i_id], dist: UpstreamHashShard(item.i_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
├── tables: [ Materialize: 4294967294 ]
└── StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / (1:Decimal + sum(sum(order_line.ol_amount)))) as $expr2] }
└── StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(order_line.ol_amount)), count] }
├── tables: [ SimpleAggState: 0 ]
StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] }
└── StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / (1:Decimal + sum(sum(order_line.ol_amount)))) as $expr2], noop_update_hint: true }
└── StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(order_line.ol_amount)), count] } { tables: [ SimpleAggState: 0 ] }
└── StreamExchange Single from 1
Fragment 1
Expand Down Expand Up @@ -2358,7 +2356,7 @@
│ └─StreamProject { exprs: [revenue1.total_revenue, revenue1.supplier_no::Int64 as $expr1, revenue1.supplier_no] }
│ └─StreamTableScan { table: revenue1, columns: [revenue1.supplier_no, revenue1.total_revenue], stream_scan_type: ArrangementBackfill, stream_key: [revenue1.supplier_no], pk: [supplier_no], dist: UpstreamHashShard(revenue1.supplier_no) }
└─StreamExchange { dist: HashShard(max(max(revenue1.total_revenue))) }
└─StreamProject { exprs: [max(max(revenue1.total_revenue))] }
└─StreamProject { exprs: [max(max(revenue1.total_revenue))], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [max(max(revenue1.total_revenue)), count] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [_vnode], aggs: [max(revenue1.total_revenue), count] }
Expand Down Expand Up @@ -2396,7 +2394,7 @@
└── BatchPlanNode
Fragment 5
StreamProject { exprs: [max(max(revenue1.total_revenue))] }
StreamProject { exprs: [max(max(revenue1.total_revenue))], noop_update_hint: true }
└── StreamSimpleAgg { aggs: [max(max(revenue1.total_revenue)), count] } { tables: [ SimpleAggState: 11, SimpleAggCall0: 10 ] }
└── StreamExchange Single from 6
Expand Down Expand Up @@ -2628,7 +2626,7 @@
└─BatchScan { table: order_line, columns: [order_line.ol_i_id, order_line.ol_quantity], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [avg_yearly], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [(sum(sum(order_line.ol_amount)) / 2.0:Decimal) as $expr3] }
└─StreamProject { exprs: [(sum(sum(order_line.ol_amount)) / 2.0:Decimal) as $expr3], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum(order_line.ol_amount)] }
Expand All @@ -2651,9 +2649,8 @@
Fragment 0
StreamMaterialize { columns: [avg_yearly], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
├── tables: [ Materialize: 4294967294 ]
└── StreamProject { exprs: [(sum(sum(order_line.ol_amount)) / 2.0:Decimal) as $expr3] }
└── StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] }
├── tables: [ SimpleAggState: 0 ]
└── StreamProject { exprs: [(sum(sum(order_line.ol_amount)) / 2.0:Decimal) as $expr3], noop_update_hint: true }
└── StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] } { tables: [ SimpleAggState: 0 ] }
└── StreamExchange Single from 1
Fragment 1
Expand Down Expand Up @@ -2861,7 +2858,7 @@
└─BatchScan { table: order_line, columns: [order_line.ol_w_id, order_line.ol_i_id, order_line.ol_amount, order_line.ol_quantity], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum(sum(order_line.ol_amount))] }
└─StreamProject { exprs: [sum(sum(order_line.ol_amount))], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum(order_line.ol_amount)] }
Expand All @@ -2880,7 +2877,7 @@
Fragment 0
StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
├── tables: [ Materialize: 4294967294 ]
└── StreamProject { exprs: [sum(sum(order_line.ol_amount))] }
└── StreamProject { exprs: [sum(sum(order_line.ol_amount))], noop_update_hint: true }
└── StreamSimpleAgg { aggs: [sum(sum(order_line.ol_amount)), count] }
├── tables: [ SimpleAggState: 0 ]
└── StreamExchange Single from 1
Expand Down Expand Up @@ -3410,7 +3407,7 @@
│ └─StreamProject { exprs: [orders.o_c_id, orders.o_w_id, orders.o_d_id, orders.o_id] }
│ └─StreamTableScan { table: orders, columns: [orders.o_d_id, orders.o_w_id, orders.o_c_id, orders.o_id], stream_scan_type: ArrangementBackfill, stream_key: [orders.o_w_id, orders.o_d_id, orders.o_id], pk: [o_w_id, o_d_id, o_id], dist: UpstreamHashShard(orders.o_w_id, orders.o_d_id, orders.o_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [(sum(sum(customer.c_balance)) / sum0(count(customer.c_balance))::Decimal) as $expr1] }
└─StreamProject { exprs: [(sum(sum(customer.c_balance)) / sum0(count(customer.c_balance))::Decimal) as $expr1], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [sum(sum(customer.c_balance)), sum0(count(customer.c_balance)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum(customer.c_balance), count(customer.c_balance)] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
└─StreamProject { exprs: [Sqrt($expr5) as $expr6, Case((sum0(count(t.v)) <= 1:Int64), null:Decimal, Sqrt(($expr4 / (sum0(count(t.v)) - 1:Int64)::Decimal))) as $expr7, $expr5, Case((sum0(count(t.v)) <= 1:Int64), null:Decimal, ($expr4 / (sum0(count(t.v)) - 1:Int64)::Decimal)) as $expr8] }
└─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), ($expr4 / $expr3) as $expr5, $expr4] }
└─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), (sum(sum($expr1))::Decimal - (($expr2 * $expr2) / $expr3)) as $expr4, $expr3] }
└─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), sum(sum(t.v))::Decimal as $expr2, sum0(count(t.v))::Decimal as $expr3] }
└─StreamProject { exprs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), sum(sum(t.v))::Decimal as $expr2, sum0(count(t.v))::Decimal as $expr3], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(t.v)), sum0(count(t.v)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum($expr1), sum(t.v), count(t.v)] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
└─StreamDynamicFilter { predicate: (t1.v1 > max(max(t2.v2))), output: [t1.v1, t1._row_id] }
├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [max(max(t2.v2))] }
└─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] }
Expand Down Expand Up @@ -77,7 +77,7 @@
├─StreamProject { exprs: [t1.v1, (t1.v1 + t1.v1) as $expr1, t1._row_id] }
│ └─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [max(max(t2.v2))] }
└─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] }
Expand Down Expand Up @@ -129,7 +129,7 @@
├─StreamExchange { dist: HashShard(t1.v1) }
│ └─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(max(max(t2.v2))) }
└─StreamProject { exprs: [max(max(t2.v2))] }
└─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] }
Expand All @@ -153,7 +153,7 @@
├─StreamProject { exprs: [t1.v1, t1.v1::Int64 as $expr1, t1._row_id] }
│ └─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [max(max(t2.v2))] }
└─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] }
Expand All @@ -169,7 +169,7 @@
└─StreamDynamicFilter { predicate: (t1.v1 > max(max(t2.v2))), output: [t1.v1, t1._row_id] }
├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [max(max(t2.v2))] }
└─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] }
Expand All @@ -191,7 +191,7 @@
└─StreamDynamicFilter { predicate: (t1.v1 > $expr1), output: [t1.v1, t1._row_id] }
├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [(2:Int32 * max(max(t2.v2))) as $expr1] }
└─StreamProject { exprs: [(2:Int32 * max(max(t2.v2))) as $expr1], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] }
Expand Down Expand Up @@ -220,7 +220,7 @@
│ ├─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamShare { id: 6 }
│ └─StreamProject { exprs: [max(max(t2.v2))] }
│ └─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true }
│ └─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] }
│ └─StreamExchange { dist: Single }
│ └─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] }
Expand All @@ -229,7 +229,7 @@
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [(max(max(t2.v2)) + 5:Int32) as $expr1] }
└─StreamShare { id: 6 }
└─StreamProject { exprs: [max(max(t2.v2))] }
└─StreamProject { exprs: [max(max(t2.v2))], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [_vnode], aggs: [max(t2.v2), count] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@
where path_val = t1.id;
stream_plan: |-
StreamMaterialize { columns: [array_agg], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [array_agg(t1.n order_by($expr1 ASC))] }
└─StreamProject { exprs: [array_agg(t1.n order_by($expr1 ASC))], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [array_agg(t1.n order_by($expr1 ASC)), count] }
└─StreamExchange { dist: Single }
└─StreamProject { exprs: [t1.n, (projected_row_id + 1:Int64) as $expr1, t1._row_id, t2.p, t2.p, t2.d, t2.d, projected_row_id, t1.id, t2._row_id] }
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/output/limit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
stream_plan: |-
StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamTopN { order: [sum0(count) ASC], limit: 1, offset: 0 }
└─StreamProject { exprs: [sum0(count)] }
└─StreamProject { exprs: [sum0(count)], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [sum0(count), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [count] }
Expand All @@ -154,7 +154,7 @@
stream_plan: |-
StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamTopN { order: [sum0(count) ASC], limit: 1, offset: 0 }
└─StreamProject { exprs: [sum0(count)] }
└─StreamProject { exprs: [sum0(count)], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [sum0(count), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [count] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
select count(*), max(a) from t;
stream_plan: |-
StreamMaterialize { columns: [count, max], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum0(count), max(max(t.a))] }
└─StreamProject { exprs: [sum0(count), max(max(t.a))], noop_update_hint: true }
└─StreamSimpleAgg { aggs: [sum0(count), max(max(t.a)), count] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [_vnode], aggs: [count, max(t.a)] }
Expand Down
Loading

0 comments on commit c4875c2

Please sign in to comment.