diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index f2f342f86b43c..43714fba9c7dd 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -1098,7 +1098,7 @@ JOIN side_input FOR SYSTEM_TIME AS OF PROCTIME() S ON mod(B.auction, 10000) = S.key sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10038(hidden), side_input.key(hidden)] } + StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10040(hidden), side_input.key(hidden)] } └─StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } ├─StreamExchange { dist: HashShard($expr1) } │ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] } @@ -1217,7 +1217,7 @@ BatchExchange { order: [], dist: Single } └─BatchHashAgg { group_key: [$expr1], aggs: [sum0(count) filter((flag = 0:Int64)), sum0(count filter((bid.price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((bid.price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bid.bidder) filter((flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.auction) filter((flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] } └─BatchExchange { order: [], dist: HashShard($expr1) } - └─BatchHashAgg { group_key: [$expr1, bid.bidder, bid.auction, flag], aggs: [count, count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32)), count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32)), count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32))] } + └─BatchHashAgg { group_key: [$expr1, bid.bidder, bid.auction, flag], aggs: [count, count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32))] } └─BatchExchange { order: [], dist: HashShard($expr1, bid.bidder, bid.auction, flag) } └─BatchExpand { column_subsets: [[$expr1], [$expr1, bid.bidder], [$expr1, bid.auction]] } └─BatchProject { exprs: [ToChar(bid.date_time, 'yyyy-MM-dd':Varchar) as $expr1, bid.price, bid.bidder, bid.auction] } @@ -1283,7 +1283,7 @@ └─StreamProject { exprs: [$expr1, sum0(count) filter((flag = 0:Int64)), sum0(count filter((bid.price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((bid.price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bid.bidder) filter((flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.auction) filter((flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] } └─StreamHashAgg { group_key: [$expr1], aggs: [sum0(count) filter((flag = 0:Int64)), sum0(count filter((bid.price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((bid.price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bid.bidder) filter((flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.auction) filter((flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count] } └─StreamExchange { dist: HashShard($expr1) } - └─StreamHashAgg [append_only] { group_key: [$expr1, bid.bidder, bid.auction, flag], aggs: [count, count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32)), count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32)), count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32))] } + └─StreamHashAgg [append_only] { group_key: [$expr1, bid.bidder, bid.auction, flag], aggs: [count, count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32))] } └─StreamExchange { dist: HashShard($expr1, bid.bidder, bid.auction, flag) } └─StreamExpand { column_subsets: [[$expr1], [$expr1, bid.bidder], [$expr1, bid.auction]] } └─StreamProject { exprs: [ToChar(bid.date_time, 'yyyy-MM-dd':Varchar) as $expr1, bid.price, bid.bidder, bid.auction, bid._row_id] } @@ -1299,7 +1299,7 @@ └── StreamExchange Hash([0]) from 1 Fragment 1 - StreamHashAgg [append_only] { group_key: [$expr1, bid.bidder, bid.auction, flag], aggs: [count, count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32)), count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32)), count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32))] } { intermediate state table: 1, state tables: [], distinct tables: [] } + StreamHashAgg [append_only] { group_key: [$expr1, bid.bidder, bid.auction, flag], aggs: [count, count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32))] } { intermediate state table: 1, state tables: [], distinct tables: [] } └── StreamExchange Hash([0, 2, 3, 10]) from 2 Fragment 2 @@ -1316,7 +1316,7 @@ ├── distribution key: [ 0 ] └── read pk prefix len hint: 1 - Table 1 { columns: [ $expr1, bid_bidder, bid_auction, flag, count, count filter((bid_price < 10000:Int32)), count filter((bid_price >= 10000:Int32) AND (bid_price < 1000000:Int32)), count filter((bid_price >= 1000000:Int32)), count filter((bid_price < 10000:Int32))_0, count filter((bid_price >= 10000:Int32) AND (bid_price < 1000000:Int32))_0, count filter((bid_price >= 1000000:Int32))_0, count filter((bid_price < 10000:Int32))_1, count filter((bid_price >= 10000:Int32) AND (bid_price < 1000000:Int32))_1, count filter((bid_price >= 1000000:Int32))_1 ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 ], distribution key: [ 0, 1, 2, 3 ], read pk prefix len hint: 4 } + Table 1 { columns: [ $expr1, bid_bidder, bid_auction, flag, count, count filter((bid_price < 10000:Int32)), count filter((bid_price >= 10000:Int32) AND (bid_price < 1000000:Int32)), count filter((bid_price >= 1000000:Int32)) ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 4, 5, 6, 7 ], distribution key: [ 0, 1, 2, 3 ], read pk prefix len hint: 4 } Table 2 { columns: [ vnode, _row_id, bid_backfill_finished, bid_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } @@ -1350,7 +1350,7 @@ BatchExchange { order: [], dist: Single } └─BatchHashAgg { group_key: [bid.channel, $expr1], aggs: [max(max($expr2)) filter((flag = 0:Int64)), sum0(count) filter((flag = 0:Int64)), sum0(count filter((bid.price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((bid.price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bid.bidder) filter((flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.auction) filter((flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] } └─BatchExchange { order: [], dist: HashShard(bid.channel, $expr1) } - └─BatchHashAgg { group_key: [bid.channel, $expr1, bid.bidder, bid.auction, flag], aggs: [max($expr2), count, count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32)), count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32)), count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32))] } + └─BatchHashAgg { group_key: [bid.channel, $expr1, bid.bidder, bid.auction, flag], aggs: [max($expr2), count, count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32))] } └─BatchExchange { order: [], dist: HashShard(bid.channel, $expr1, bid.bidder, bid.auction, flag) } └─BatchExpand { column_subsets: [[bid.channel, $expr1, $expr2], [bid.channel, $expr1, bid.bidder], [bid.channel, $expr1, bid.auction]] } └─BatchProject { exprs: [bid.channel, ToChar(bid.date_time, 'yyyy-MM-dd':Varchar) as $expr1, ToChar(bid.date_time, 'HH:mm':Varchar) as $expr2, bid.price, bid.bidder, bid.auction] } @@ -1418,7 +1418,7 @@ └─StreamProject { exprs: [bid.channel, $expr1, max(max($expr2)) filter((flag = 0:Int64)), sum0(count) filter((flag = 0:Int64)), sum0(count filter((bid.price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((bid.price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bid.bidder) filter((flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.auction) filter((flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] } └─StreamHashAgg { group_key: [bid.channel, $expr1], aggs: [max(max($expr2)) filter((flag = 0:Int64)), sum0(count) filter((flag = 0:Int64)), sum0(count filter((bid.price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((bid.price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bid.bidder) filter((flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.bidder) filter((count filter((bid.price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bid.auction) filter((flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(bid.auction) filter((count filter((bid.price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count] } └─StreamExchange { dist: HashShard(bid.channel, $expr1) } - └─StreamHashAgg [append_only] { group_key: [bid.channel, $expr1, bid.bidder, bid.auction, flag], aggs: [max($expr2), count, count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32)), count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32)), count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32))] } + └─StreamHashAgg [append_only] { group_key: [bid.channel, $expr1, bid.bidder, bid.auction, flag], aggs: [max($expr2), count, count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32))] } └─StreamExchange { dist: HashShard(bid.channel, $expr1, bid.bidder, bid.auction, flag) } └─StreamExpand { column_subsets: [[bid.channel, $expr1, $expr2], [bid.channel, $expr1, bid.bidder], [bid.channel, $expr1, bid.auction]] } └─StreamProject { exprs: [bid.channel, ToChar(bid.date_time, 'yyyy-MM-dd':Varchar) as $expr1, ToChar(bid.date_time, 'HH:mm':Varchar) as $expr2, bid.price, bid.bidder, bid.auction, bid._row_id] } @@ -1434,7 +1434,7 @@ └── StreamExchange Hash([0, 1]) from 1 Fragment 1 - StreamHashAgg [append_only] { group_key: [bid.channel, $expr1, bid.bidder, bid.auction, flag], aggs: [max($expr2), count, count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32)), count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32)), count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32))] } { intermediate state table: 2, state tables: [], distinct tables: [] } + StreamHashAgg [append_only] { group_key: [bid.channel, $expr1, bid.bidder, bid.auction, flag], aggs: [max($expr2), count, count filter((bid.price < 10000:Int32)), count filter((bid.price >= 10000:Int32) AND (bid.price < 1000000:Int32)), count filter((bid.price >= 1000000:Int32))] } { intermediate state table: 2, state tables: [], distinct tables: [] } └── StreamExchange Hash([0, 1, 4, 5, 14]) from 2 Fragment 2 @@ -1453,7 +1453,7 @@ ├── distribution key: [ 0, 1 ] └── read pk prefix len hint: 2 - Table 2 { columns: [ bid_channel, $expr1, bid_bidder, bid_auction, flag, max($expr2), count, count filter((bid_price < 10000:Int32)), count filter((bid_price >= 10000:Int32) AND (bid_price < 1000000:Int32)), count filter((bid_price >= 1000000:Int32)), count filter((bid_price < 10000:Int32))_0, count filter((bid_price >= 10000:Int32) AND (bid_price < 1000000:Int32))_0, count filter((bid_price >= 1000000:Int32))_0, count filter((bid_price < 10000:Int32))_1, count filter((bid_price >= 10000:Int32) AND (bid_price < 1000000:Int32))_1, count filter((bid_price >= 1000000:Int32))_1 ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC, $4 ASC ], value indices: [ 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 ], distribution key: [ 0, 1, 2, 3, 4 ], read pk prefix len hint: 5 } + Table 2 { columns: [ bid_channel, $expr1, bid_bidder, bid_auction, flag, max($expr2), count, count filter((bid_price < 10000:Int32)), count filter((bid_price >= 10000:Int32) AND (bid_price < 1000000:Int32)), count filter((bid_price >= 1000000:Int32)) ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC, $4 ASC ], value indices: [ 5, 6, 7, 8, 9 ], distribution key: [ 0, 1, 2, 3, 4 ], read pk prefix len hint: 5 } Table 3 { columns: [ vnode, _row_id, bid_backfill_finished, bid_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml index eecfd41ef5c00..9a76e39085120 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml @@ -1040,7 +1040,7 @@ BatchExchange { order: [], dist: Single } └─BatchHashAgg { group_key: [$expr1], aggs: [sum0(count) filter((flag = 0:Int64)), sum0(count filter((price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 10000:Int32) AND (price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bidder) filter((flag = 1:Int64)), count(bidder) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(auction) filter((flag = 2:Int64)), count(auction) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] } └─BatchExchange { order: [], dist: HashShard($expr1) } - └─BatchHashAgg { group_key: [$expr1, bidder, auction, flag], aggs: [count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } + └─BatchHashAgg { group_key: [$expr1, bidder, auction, flag], aggs: [count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } └─BatchExchange { order: [], dist: HashShard($expr1, bidder, auction, flag) } └─BatchExpand { column_subsets: [[$expr1], [$expr1, bidder], [$expr1, auction]] } └─BatchProject { exprs: [ToChar(date_time, 'yyyy-MM-dd':Varchar) as $expr1, price, bidder, auction] } @@ -1106,7 +1106,7 @@ └─StreamProject { exprs: [$expr1, sum0(count) filter((flag = 0:Int64)), sum0(count filter((price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 10000:Int32) AND (price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bidder) filter((flag = 1:Int64)), count(bidder) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(auction) filter((flag = 2:Int64)), count(auction) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] } └─StreamHashAgg { group_key: [$expr1], aggs: [sum0(count) filter((flag = 0:Int64)), sum0(count filter((price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 10000:Int32) AND (price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bidder) filter((flag = 1:Int64)), count(bidder) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(auction) filter((flag = 2:Int64)), count(auction) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count] } └─StreamExchange { dist: HashShard($expr1) } - └─StreamHashAgg [append_only] { group_key: [$expr1, bidder, auction, flag], aggs: [count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } + └─StreamHashAgg [append_only] { group_key: [$expr1, bidder, auction, flag], aggs: [count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } └─StreamExchange { dist: HashShard($expr1, bidder, auction, flag) } └─StreamExpand { column_subsets: [[$expr1], [$expr1, bidder], [$expr1, auction]] } └─StreamProject { exprs: [ToChar(date_time, 'yyyy-MM-dd':Varchar) as $expr1, price, bidder, auction, _row_id] } @@ -1123,7 +1123,7 @@ └── StreamExchange Hash([0]) from 1 Fragment 1 - StreamHashAgg [append_only] { group_key: [$expr1, bidder, auction, flag], aggs: [count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } { intermediate state table: 1, state tables: [], distinct tables: [] } + StreamHashAgg [append_only] { group_key: [$expr1, bidder, auction, flag], aggs: [count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } { intermediate state table: 1, state tables: [], distinct tables: [] } └── StreamExchange Hash([0, 2, 3, 10]) from 2 Fragment 2 @@ -1139,7 +1139,7 @@ ├── distribution key: [ 0 ] └── read pk prefix len hint: 1 - Table 1 { columns: [ $expr1, bidder, auction, flag, count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32))_0, count filter((price >= 10000:Int32) AND (price < 1000000:Int32))_0, count filter((price >= 1000000:Int32))_0, count filter((price < 10000:Int32))_1, count filter((price >= 10000:Int32) AND (price < 1000000:Int32))_1, count filter((price >= 1000000:Int32))_1 ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 ], distribution key: [ 0, 1, 2, 3 ], read pk prefix len hint: 4 } + Table 1 { columns: [ $expr1, bidder, auction, flag, count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)) ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 4, 5, 6, 7 ], distribution key: [ 0, 1, 2, 3 ], read pk prefix len hint: 4 } Table 2 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 } @@ -1173,8 +1173,8 @@ └─StreamHashAgg { group_key: [$expr1], aggs: [sum0(sum0(count) filter((flag = 0:Int64))), sum0(sum0(count filter((price < 10000:Int32))) filter((flag = 0:Int64))), sum0(sum0(count filter((price >= 10000:Int32) AND (price < 1000000:Int32))) filter((flag = 0:Int64))), sum0(sum0(count filter((price >= 1000000:Int32))) filter((flag = 0:Int64))), sum0(count(bidder) filter((flag = 1:Int64))), sum0(count(bidder) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count(bidder) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count(bidder) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count(auction) filter((flag = 2:Int64))), sum0(count(auction) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64))), sum0(count(auction) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))), sum0(count(auction) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))), count] } └─StreamExchange { dist: HashShard($expr1) } └─StreamHashAgg { group_key: [$expr1, $expr2], aggs: [sum0(count) filter((flag = 0:Int64)), sum0(count filter((price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 10000:Int32) AND (price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bidder) filter((flag = 1:Int64)), count(bidder) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(auction) filter((flag = 2:Int64)), count(auction) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count] } - └─StreamProject { exprs: [$expr1, bidder, auction, flag, count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), Vnode($expr1, bidder, auction, flag) as $expr2] } - └─StreamHashAgg [append_only] { group_key: [$expr1, bidder, auction, flag], aggs: [count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } + └─StreamProject { exprs: [$expr1, bidder, auction, flag, count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), Vnode($expr1, bidder, auction, flag) as $expr2] } + └─StreamHashAgg [append_only] { group_key: [$expr1, bidder, auction, flag], aggs: [count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } └─StreamExchange { dist: HashShard($expr1, bidder, auction, flag) } └─StreamExpand { column_subsets: [[$expr1], [$expr1, bidder], [$expr1, auction]] } └─StreamProject { exprs: [ToChar(date_time, 'yyyy-MM-dd':Varchar) as $expr1, price, bidder, auction, _row_id] } @@ -1192,8 +1192,8 @@ Fragment 1 StreamHashAgg { group_key: [$expr1, $expr2], aggs: [sum0(count) filter((flag = 0:Int64)), sum0(count filter((price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 10000:Int32) AND (price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bidder) filter((flag = 1:Int64)), count(bidder) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(auction) filter((flag = 2:Int64)), count(auction) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count] } { intermediate state table: 1, state tables: [], distinct tables: [] } - └── StreamProject { exprs: [$expr1, bidder, auction, flag, count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), Vnode($expr1, bidder, auction, flag) as $expr2] } - └── StreamHashAgg [append_only] { group_key: [$expr1, bidder, auction, flag], aggs: [count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } { intermediate state table: 2, state tables: [], distinct tables: [] } + └── StreamProject { exprs: [$expr1, bidder, auction, flag, count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), Vnode($expr1, bidder, auction, flag) as $expr2] } + └── StreamHashAgg [append_only] { group_key: [$expr1, bidder, auction, flag], aggs: [count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } { intermediate state table: 2, state tables: [], distinct tables: [] } └── StreamExchange Hash([0, 2, 3, 10]) from 2 Fragment 2 @@ -1217,7 +1217,7 @@ ├── read pk prefix len hint: 2 └── vnode column idx: 1 - Table 2 { columns: [ $expr1, bidder, auction, flag, count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32))_0, count filter((price >= 10000:Int32) AND (price < 1000000:Int32))_0, count filter((price >= 1000000:Int32))_0, count filter((price < 10000:Int32))_1, count filter((price >= 10000:Int32) AND (price < 1000000:Int32))_1, count filter((price >= 1000000:Int32))_1 ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 ], distribution key: [ 0, 1, 2, 3 ], read pk prefix len hint: 4 } + Table 2 { columns: [ $expr1, bidder, auction, flag, count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)) ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 4, 5, 6, 7 ], distribution key: [ 0, 1, 2, 3 ], read pk prefix len hint: 4 } Table 3 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 } @@ -1252,7 +1252,7 @@ BatchExchange { order: [], dist: Single } └─BatchHashAgg { group_key: [channel, $expr1], aggs: [max(max($expr2)) filter((flag = 0:Int64)), sum0(count) filter((flag = 0:Int64)), sum0(count filter((price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 10000:Int32) AND (price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bidder) filter((flag = 1:Int64)), count(bidder) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(auction) filter((flag = 2:Int64)), count(auction) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] } └─BatchExchange { order: [], dist: HashShard(channel, $expr1) } - └─BatchHashAgg { group_key: [channel, $expr1, bidder, auction, flag], aggs: [max($expr2), count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } + └─BatchHashAgg { group_key: [channel, $expr1, bidder, auction, flag], aggs: [max($expr2), count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } └─BatchExchange { order: [], dist: HashShard(channel, $expr1, bidder, auction, flag) } └─BatchExpand { column_subsets: [[channel, $expr1, $expr2], [channel, $expr1, bidder], [channel, $expr1, auction]] } └─BatchProject { exprs: [channel, ToChar(date_time, 'yyyy-MM-dd':Varchar) as $expr1, ToChar(date_time, 'HH:mm':Varchar) as $expr2, price, bidder, auction] } @@ -1320,7 +1320,7 @@ └─StreamProject { exprs: [channel, $expr1, max(max($expr2)) filter((flag = 0:Int64)), sum0(count) filter((flag = 0:Int64)), sum0(count filter((price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 10000:Int32) AND (price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bidder) filter((flag = 1:Int64)), count(bidder) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(auction) filter((flag = 2:Int64)), count(auction) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] } └─StreamHashAgg { group_key: [channel, $expr1], aggs: [max(max($expr2)) filter((flag = 0:Int64)), sum0(count) filter((flag = 0:Int64)), sum0(count filter((price < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 10000:Int32) AND (price < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((price >= 1000000:Int32))) filter((flag = 0:Int64)), count(bidder) filter((flag = 1:Int64)), count(bidder) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(bidder) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(auction) filter((flag = 2:Int64)), count(auction) filter((count filter((price < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 10000:Int32) AND (price < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(auction) filter((count filter((price >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count] } └─StreamExchange { dist: HashShard(channel, $expr1) } - └─StreamHashAgg [append_only] { group_key: [channel, $expr1, bidder, auction, flag], aggs: [max($expr2), count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } + └─StreamHashAgg [append_only] { group_key: [channel, $expr1, bidder, auction, flag], aggs: [max($expr2), count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } └─StreamExchange { dist: HashShard(channel, $expr1, bidder, auction, flag) } └─StreamExpand { column_subsets: [[channel, $expr1, $expr2], [channel, $expr1, bidder], [channel, $expr1, auction]] } └─StreamProject { exprs: [channel, ToChar(date_time, 'yyyy-MM-dd':Varchar) as $expr1, ToChar(date_time, 'HH:mm':Varchar) as $expr2, price, bidder, auction, _row_id] } @@ -1337,7 +1337,7 @@ └── StreamExchange Hash([0, 1]) from 1 Fragment 1 - StreamHashAgg [append_only] { group_key: [channel, $expr1, bidder, auction, flag], aggs: [max($expr2), count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } { intermediate state table: 2, state tables: [], distinct tables: [] } + StreamHashAgg [append_only] { group_key: [channel, $expr1, bidder, auction, flag], aggs: [max($expr2), count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32))] } { intermediate state table: 2, state tables: [], distinct tables: [] } └── StreamExchange Hash([0, 1, 4, 5, 14]) from 2 Fragment 2 @@ -1355,7 +1355,7 @@ ├── distribution key: [ 0, 1 ] └── read pk prefix len hint: 2 - Table 2 { columns: [ channel, $expr1, bidder, auction, flag, max($expr2), count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count filter((price < 10000:Int32))_0, count filter((price >= 10000:Int32) AND (price < 1000000:Int32))_0, count filter((price >= 1000000:Int32))_0, count filter((price < 10000:Int32))_1, count filter((price >= 10000:Int32) AND (price < 1000000:Int32))_1, count filter((price >= 1000000:Int32))_1 ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC, $4 ASC ], value indices: [ 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 ], distribution key: [ 0, 1, 2, 3, 4 ], read pk prefix len hint: 5 } + Table 2 { columns: [ channel, $expr1, bidder, auction, flag, max($expr2), count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)) ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC, $4 ASC ], value indices: [ 5, 6, 7, 8, 9 ], distribution key: [ 0, 1, 2, 3, 4 ], read pk prefix len hint: 5 } Table 3 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 } diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml index 9b9ff1d660dd5..e3b091d700675 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml @@ -1280,7 +1280,7 @@ BatchExchange { order: [], dist: Single } └─BatchHashAgg { group_key: [$expr2], aggs: [sum0(count) filter((flag = 0:Int64)), sum0(count filter(($expr3 < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr3 >= 1000000:Int32))) filter((flag = 0:Int64)), count($expr4) filter((flag = 1:Int64)), count($expr4) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr4) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr4) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr5) filter((flag = 2:Int64)), count($expr5) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr5) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr5) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] } └─BatchExchange { order: [], dist: HashShard($expr2) } - └─BatchHashAgg { group_key: [$expr2, $expr4, $expr5, flag], aggs: [count, count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32)), count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32)), count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32))] } + └─BatchHashAgg { group_key: [$expr2, $expr4, $expr5, flag], aggs: [count, count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32))] } └─BatchExchange { order: [], dist: HashShard($expr2, $expr4, $expr5, flag) } └─BatchExpand { column_subsets: [[$expr2], [$expr2, $expr4], [$expr2, $expr5]] } └─BatchProject { exprs: [ToChar($expr1, 'yyyy-MM-dd':Varchar) as $expr2, Field(bid, 2:Int32) as $expr3, Field(bid, 1:Int32) as $expr4, Field(bid, 0:Int32) as $expr5] } @@ -1360,7 +1360,7 @@ BatchExchange { order: [], dist: Single } └─BatchHashAgg { group_key: [$expr2, $expr3], aggs: [max(max($expr4)) filter((flag = 0:Int64)), sum0(count) filter((flag = 0:Int64)), sum0(count filter(($expr5 < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr5 >= 1000000:Int32))) filter((flag = 0:Int64)), count($expr6) filter((flag = 1:Int64)), count($expr6) filter((count filter(($expr5 < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr6) filter((count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr6) filter((count filter(($expr5 >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr7) filter((flag = 2:Int64)), count($expr7) filter((count filter(($expr5 < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr7) filter((count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr7) filter((count filter(($expr5 >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] } └─BatchExchange { order: [], dist: HashShard($expr2, $expr3) } - └─BatchHashAgg { group_key: [$expr2, $expr3, $expr6, $expr7, flag], aggs: [max($expr4), count, count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32)), count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32)), count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32))] } + └─BatchHashAgg { group_key: [$expr2, $expr3, $expr6, $expr7, flag], aggs: [max($expr4), count, count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32))] } └─BatchExchange { order: [], dist: HashShard($expr2, $expr3, $expr6, $expr7, flag) } └─BatchExpand { column_subsets: [[$expr2, $expr3, $expr4], [$expr2, $expr3, $expr6], [$expr2, $expr3, $expr7]] } └─BatchProject { exprs: [Field(bid, 3:Int32) as $expr2, ToChar($expr1, 'yyyy-MM-dd':Varchar) as $expr3, ToChar($expr1, 'HH:mm':Varchar) as $expr4, Field(bid, 2:Int32) as $expr5, Field(bid, 1:Int32) as $expr6, Field(bid, 0:Int32) as $expr7] } diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index f5b363904bab9..b2047d7cae089 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -240,11 +240,7 @@ static PUSH_CALC_OF_JOIN: LazyLock = LazyLock::new(|| { static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock = LazyLock::new(|| { OptimizationStage::new( "Convert Distinct Aggregation", - vec![ - UnionToDistinctRule::create(), - DistinctAggRule::create(true), - AggGroupBySimplifyRule::create(), - ], + vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)], ApplyOrder::TopDown, ) }); @@ -255,12 +251,19 @@ static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock = LazyLock::n vec![ UnionToDistinctRule::create(), DistinctAggRule::create(false), - AggGroupBySimplifyRule::create(), ], ApplyOrder::TopDown, ) }); +static SIMPLIFY_AGG: LazyLock = LazyLock::new(|| { + OptimizationStage::new( + "Simplify Aggregation", + vec![AggGroupBySimplifyRule::create(), AggCallMergeRule::create()], + ApplyOrder::TopDown, + ) +}); + static JOIN_COMMUTE: LazyLock = LazyLock::new(|| { OptimizationStage::new( "Join Commute".to_string(), @@ -564,6 +567,8 @@ impl LogicalOptimizer { plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM) }; + plan = plan.optimize_by_rules(&SIMPLIFY_AGG); + plan = plan.optimize_by_rules(&JOIN_COMMUTE); // Do a final column pruning and predicate pushing down to clean up the plan. @@ -636,6 +641,8 @@ impl LogicalOptimizer { // Convert distinct aggregates. plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH); + plan = plan.optimize_by_rules(&SIMPLIFY_AGG); + plan = plan.optimize_by_rules(&JOIN_COMMUTE); // Do a final column pruning and predicate pushing down to clean up the plan. diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index 4db0ac0780f62..e02c99858b7ce 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -90,7 +90,7 @@ impl Agg { self.ctx().session_ctx().config().get_force_two_phase_agg() } - fn two_phase_agg_enabled(&self) -> bool { + pub fn two_phase_agg_enabled(&self) -> bool { self.enable_two_phase } diff --git a/src/frontend/src/optimizer/rule/agg_call_merge_rule.rs b/src/frontend/src/optimizer/rule/agg_call_merge_rule.rs new file mode 100644 index 0000000000000..2a8a22bd0fcc7 --- /dev/null +++ b/src/frontend/src/optimizer/rule/agg_call_merge_rule.rs @@ -0,0 +1,57 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::{BoxedRule, Rule}; +use crate::optimizer::plan_node::generic::Agg; +use crate::optimizer::plan_node::{LogicalProject, PlanTreeNodeUnary}; +use crate::PlanRef; + +/// Merges duplicated aggregate function calls in `LogicalAgg`, and project them back to the desired schema. +pub struct AggCallMergeRule {} + +impl Rule for AggCallMergeRule { + fn apply(&self, plan: PlanRef) -> Option { + let Some(agg) = plan.as_logical_agg() else { + return None; + }; + + let calls = agg.agg_calls(); + let mut new_calls = Vec::with_capacity(calls.len()); + let mut out_fields = (0..agg.group_key().len()).collect::>(); + out_fields.extend(calls.iter().map(|call| { + let pos = new_calls.iter().position(|c| c == call).unwrap_or_else(|| { + let pos = new_calls.len(); + new_calls.push(call.clone()); + pos + }); + agg.group_key().len() + pos + })); + + if calls.len() == new_calls.len() { + // no change + None + } else { + let new_agg = Agg::new(new_calls, agg.group_key().clone(), agg.input()) + .with_enable_two_phase(agg.core().two_phase_agg_enabled()) + .into(); + Some(LogicalProject::with_out_col_idx(new_agg, out_fields.into_iter()).into()) + } + } +} + +impl AggCallMergeRule { + pub fn create() -> BoxedRule { + Box::new(Self {}) + } +} diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 137528fd6b41e..7867bb1bb54f9 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -148,6 +148,8 @@ mod agg_group_by_simplify_rule; pub use agg_group_by_simplify_rule::*; mod apply_hop_window_transpose_rule; pub use apply_hop_window_transpose_rule::*; +mod agg_call_merge_rule; +pub use agg_call_merge_rule::*; #[macro_export] macro_rules! for_all_rules { @@ -212,6 +214,7 @@ macro_rules! for_all_rules { , { ExpandToProjectRule } , { AggGroupBySimplifyRule } , { ApplyHopWindowTransposeRule } + , { AggCallMergeRule } } }; }