Skip to content

Commit

Permalink
feat(optimizer): avoid predicate pushdown for batch queries if unnece…
Browse files Browse the repository at this point in the history
…ssary (#13470)

Co-authored-by: stonepage <[email protected]>
Co-authored-by: st1page <[email protected]>
  • Loading branch information
3 people authored Nov 16, 2023
1 parent 942a526 commit 7ebab64
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,5 @@
sql: |
with t(v, arr) as (select 1, array[2, 3]) select v < all(arr), v < some(arr) from t;
batch_plan: |-
BatchProject { exprs: [All((1:Int32 < $expr10015)) as $expr1, Some((1:Int32 < $expr10015)) as $expr2] }
BatchProject { exprs: [All((1:Int32 < $expr10009)) as $expr1, Some((1:Int32 < $expr10009)) as $expr2] }
└─BatchValues { rows: [[1:Int32, ARRAY[2, 3]:List(Int32)]] }
26 changes: 1 addition & 25 deletions src/frontend/planner_test/tests/testdata/output/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@
LogicalProject { exprs: [1:Int32] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
Predicate Push Down:
LogicalProject { exprs: [1:Int32] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
Predicate Push Down:
LogicalProject { exprs: [1:Int32] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
Convert Over Window:
apply TrivialProjectToValuesRule 1 time(s)
Expand Down Expand Up @@ -65,7 +55,7 @@
"stages": {
"0": {
"root": {
"plan_node_id": 10020,
"plan_node_id": 10016,
"plan_node_type": "BatchValues",
"schema": [
{
Expand Down Expand Up @@ -121,13 +111,6 @@
Predicate Push Down:
LogicalProject { exprs: [t1.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) }
├─LogicalScan { table: t1, columns: [v1, _row_id] }
└─LogicalScan { table: t2, columns: [v2, _row_id] }
Predicate Push Down:
LogicalProject { exprs: [t1.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) }
├─LogicalScan { table: t1, columns: [v1, _row_id] }
Expand All @@ -151,13 +134,6 @@
Prune Columns:
LogicalProject { exprs: [t1.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) }
├─LogicalScan { table: t1, columns: [v1] }
└─LogicalScan { table: t2, columns: [v2] }
Predicate Push Down:
LogicalProject { exprs: [t1.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) }
├─LogicalScan { table: t1, columns: [v1] }
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/output/expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@
└─LogicalProject { exprs: [Array(1:Int32) as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |-
BatchProject { exprs: [Some((1:Int32 < ArrayCat($expr10013, ARRAY[2]:List(Int32)))) as $expr1] }
BatchProject { exprs: [Some((1:Int32 < ArrayCat($expr10011, ARRAY[2]:List(Int32)))) as $expr1] }
└─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchValues { rows: [[]] }
└─BatchValues { rows: [[ARRAY[1]:List(Int32)]] }
Expand All @@ -473,7 +473,7 @@
└─LogicalProject { exprs: [Array(1:Int32) as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |-
BatchProject { exprs: [All((1:Int32 < ArrayCat($expr10013, ARRAY[2]:List(Int32)))) as $expr1] }
BatchProject { exprs: [All((1:Int32 < ArrayCat($expr10011, ARRAY[2]:List(Int32)))) as $expr1] }
└─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchValues { rows: [[]] }
└─BatchValues { rows: [[ARRAY[1]:List(Int32)]] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@
sql: SELECT auction, price FROM bid WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [auction, price] }
└─BatchFilter { predicate: (((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR (auction = 2001:Int32)) OR (auction = 2019:Int32)) OR (auction = 2087:Int32)) }
└─BatchFilter { predicate: (((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR (auction = 2001:Int32)) OR (auction = 2019:Int32)) OR (auction = 2087:Int32)) }
└─BatchProject { exprs: [auction, price] }
└─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
Expand Down Expand Up @@ -152,10 +152,12 @@
└─BatchHashJoin { type: Inner, predicate: seller = id, output: [name, city, state, id] }
├─BatchExchange { order: [], dist: HashShard(seller) }
│ └─BatchFilter { predicate: (category = 10:Int32) }
│ └─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id], filter: (None, None) }
│ └─BatchProject { exprs: [id, seller, category] }
│ └─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id], filter: (None, None) }
└─BatchExchange { order: [], dist: HashShard(id) }
└─BatchFilter { predicate: (((state = 'or':Varchar) OR (state = 'id':Varchar)) OR (state = 'ca':Varchar)) }
└─BatchSource { source: person, columns: [id, name, email_address, credit_card, city, state, date_time, extra, _row_id], filter: (None, None) }
└─BatchProject { exprs: [id, name, city, state] }
└─BatchSource { source: person, columns: [id, name, email_address, credit_card, city, state, date_time, extra, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [name, city, state, id, _row_id(hidden), seller(hidden), _row_id#1(hidden)], stream_key: [_row_id, _row_id#1, seller], pk_columns: [_row_id, _row_id#1, seller], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(_row_id, seller, _row_id) }
Expand Down Expand Up @@ -1015,7 +1017,8 @@
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [auction, bidder, (0.908:Decimal * price::Decimal) as $expr1, Case(((Extract('HOUR':Varchar, date_time) >= 8:Decimal) AND (Extract('HOUR':Varchar, date_time) <= 18:Decimal)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, date_time) <= 6:Decimal) OR (Extract('HOUR':Varchar, date_time) >= 20:Decimal)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr2, date_time, extra] }
└─BatchFilter { predicate: ((0.908:Decimal * price::Decimal) > 1000000:Decimal) AND ((0.908:Decimal * price::Decimal) < 50000000:Decimal) }
└─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], filter: (None, None) }
└─BatchProject { exprs: [auction, bidder, price, date_time, extra] }
└─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [auction, bidder, price, bidtimetype, date_time, extra, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [auction, bidder, (0.908:Decimal * price::Decimal) as $expr1, Case(((Extract('HOUR':Varchar, date_time) >= 8:Decimal) AND (Extract('HOUR':Varchar, date_time) <= 18:Decimal)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, date_time) <= 6:Decimal) OR (Extract('HOUR':Varchar, date_time) >= 20:Decimal)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr2, date_time, extra, _row_id] }
Expand Down Expand Up @@ -1634,7 +1637,8 @@
│ └─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], filter: (None, None) }
└─BatchExchange { order: [], dist: HashShard(id) }
└─BatchFilter { predicate: (category = 10:Int32) }
└─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id], filter: (None, None) }
└─BatchProject { exprs: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category] }
└─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_timeb, item_name, description, initial_bid, reserve, date_timea, expires, seller, category, _row_id(hidden), _row_id#1(hidden)], stream_key: [_row_id, _row_id#1, auction], pk_columns: [_row_id, _row_id#1, auction], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(auction, _row_id, _row_id) }
Expand Down Expand Up @@ -1705,7 +1709,8 @@
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [auction, bidder, price, channel, Case((Lower(channel) = 'apple':Varchar), '0':Varchar, (Lower(channel) = 'google':Varchar), '1':Varchar, (Lower(channel) = 'facebook':Varchar), '2':Varchar, (Lower(channel) = 'baidu':Varchar), '3':Varchar, ArrayAccess(RegexpMatch(url, '(&|^)channel_id=([^&]*)':Varchar), 2:Int32)) as $expr1] }
└─BatchFilter { predicate: (IsNotNull(ArrayAccess(RegexpMatch(url, '(&|^)channel_id=([^&]*)':Varchar), 2:Int32)) OR In(Lower(channel), 'apple':Varchar, 'google':Varchar, 'facebook':Varchar, 'baidu':Varchar)) }
└─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], filter: (None, None) }
└─BatchProject { exprs: [auction, bidder, price, channel, url] }
└─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [auction, bidder, price, channel, channel_id, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [auction, bidder, price, channel, Case((Lower(channel) = 'apple':Varchar), '0':Varchar, (Lower(channel) = 'google':Varchar), '1':Varchar, (Lower(channel) = 'facebook':Varchar), '2':Varchar, (Lower(channel) = 'baidu':Varchar), '3':Varchar, ArrayAccess(RegexpMatch(url, '(&|^)channel_id=([^&]*)':Varchar), 2:Int32)) as $expr1, _row_id] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,16 @@
A.category = 10 and (P.state = 'or' OR P.state = 'id' OR P.state = 'ca');
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: Inner, predicate: $expr3 = $expr5, output: [$expr6, $expr7, $expr8, $expr2] }
├─BatchExchange { order: [], dist: HashShard($expr3) }
│ └─BatchProject { exprs: [Field(auction, 0:Int32) as $expr2, Field(auction, 7:Int32) as $expr3] }
└─BatchHashJoin { type: Inner, predicate: $expr2 = $expr3, output: [$expr4, $expr5, $expr6, $expr1] }
├─BatchExchange { order: [], dist: HashShard($expr2) }
│ └─BatchProject { exprs: [Field(auction, 0:Int32) as $expr1, Field(auction, 7:Int32) as $expr2] }
│ └─BatchFilter { predicate: (Field(auction, 8:Int32) = 10:Int32) AND (event_type = 1:Int32) }
│ └─BatchProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] }
│ └─BatchProject { exprs: [event_type, auction] }
│ └─BatchSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id], filter: (None, None) }
└─BatchExchange { order: [], dist: HashShard($expr5) }
└─BatchProject { exprs: [Field(person, 0:Int32) as $expr5, Field(person, 1:Int32) as $expr6, Field(person, 4:Int32) as $expr7, Field(person, 5:Int32) as $expr8] }
└─BatchExchange { order: [], dist: HashShard($expr3) }
└─BatchProject { exprs: [Field(person, 0:Int32) as $expr3, Field(person, 1:Int32) as $expr4, Field(person, 4:Int32) as $expr5, Field(person, 5:Int32) as $expr6] }
└─BatchFilter { predicate: (((Field(person, 5:Int32) = 'or':Varchar) OR (Field(person, 5:Int32) = 'id':Varchar)) OR (Field(person, 5:Int32) = 'ca':Varchar)) AND (event_type = 0:Int32) }
└─BatchProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr4, _row_id] }
└─BatchProject { exprs: [event_type, person] }
└─BatchSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [name, city, state, id, _row_id(hidden), $expr3(hidden), _row_id#1(hidden)], stream_key: [_row_id, _row_id#1, $expr3], pk_columns: [_row_id, _row_id#1, $expr3], pk_conflict: NoCheck }
Expand Down Expand Up @@ -1719,12 +1719,12 @@
├─BatchExchange { order: [], dist: HashShard($expr2) }
│ └─BatchProject { exprs: [Field(bid, 0:Int32) as $expr2, Field(bid, 1:Int32) as $expr3, Field(bid, 2:Int32) as $expr4, Field(bid, 3:Int32) as $expr5, Field(bid, 4:Int32) as $expr6, $expr1] }
│ └─BatchFilter { predicate: (event_type = 2:Int32) }
│ └─BatchProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] }
│ └─BatchProject { exprs: [event_type, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1] }
│ └─BatchSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id], filter: (None, None) }
└─BatchExchange { order: [], dist: HashShard($expr8) }
└─BatchProject { exprs: [Field(auction, 0:Int32) as $expr8, Field(auction, 1:Int32) as $expr9, Field(auction, 2:Int32) as $expr10, Field(auction, 3:Int32) as $expr11, Field(auction, 4:Int32) as $expr12, $expr7, Field(auction, 6:Int32) as $expr13, Field(auction, 7:Int32) as $expr14, Field(auction, 8:Int32) as $expr15] }
└─BatchFilter { predicate: (Field(auction, 8:Int32) = 10:Int32) AND (event_type = 1:Int32) }
└─BatchProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr7, _row_id] }
└─BatchProject { exprs: [event_type, auction, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr7] }
└─BatchSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_timeb, item_name, description, initial_bid, reserve, date_timea, expires, seller, category, _row_id(hidden), _row_id#1(hidden)], stream_key: [_row_id, _row_id#1, auction], pk_columns: [_row_id, _row_id#1, auction], pk_conflict: NoCheck }
Expand Down Expand Up @@ -1819,10 +1819,10 @@
lower(channel) in ('apple', 'google', 'facebook', 'baidu');
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [$expr2, $expr3, $expr4, $expr5, Case((Lower($expr5) = 'apple':Varchar), '0':Varchar, (Lower($expr5) = 'google':Varchar), '1':Varchar, (Lower($expr5) = 'facebook':Varchar), '2':Varchar, (Lower($expr5) = 'baidu':Varchar), '3':Varchar, ArrayAccess(RegexpMatch($expr6, '(&|^)channel_id=([^&]*)':Varchar), 2:Int32)) as $expr7] }
└─BatchProject { exprs: [Field(bid, 0:Int32) as $expr2, Field(bid, 1:Int32) as $expr3, Field(bid, 2:Int32) as $expr4, Field(bid, 3:Int32) as $expr5, Field(bid, 4:Int32) as $expr6] }
└─BatchProject { exprs: [$expr1, $expr2, $expr3, $expr4, Case((Lower($expr4) = 'apple':Varchar), '0':Varchar, (Lower($expr4) = 'google':Varchar), '1':Varchar, (Lower($expr4) = 'facebook':Varchar), '2':Varchar, (Lower($expr4) = 'baidu':Varchar), '3':Varchar, ArrayAccess(RegexpMatch($expr5, '(&|^)channel_id=([^&]*)':Varchar), 2:Int32)) as $expr6] }
└─BatchProject { exprs: [Field(bid, 0:Int32) as $expr1, Field(bid, 1:Int32) as $expr2, Field(bid, 2:Int32) as $expr3, Field(bid, 3:Int32) as $expr4, Field(bid, 4:Int32) as $expr5] }
└─BatchFilter { predicate: (IsNotNull(ArrayAccess(RegexpMatch(Field(bid, 4:Int32), '(&|^)channel_id=([^&]*)':Varchar), 2:Int32)) OR In(Lower(Field(bid, 3:Int32)), 'apple':Varchar, 'google':Varchar, 'facebook':Varchar, 'baidu':Varchar)) AND (event_type = 2:Int32) }
└─BatchProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] }
└─BatchProject { exprs: [event_type, bid] }
└─BatchSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [auction, bidder, price, channel, channel_id, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
Expand Down Expand Up @@ -1861,10 +1861,10 @@
SPLIT_PART(url, '/', 6) as dir3 FROM bid;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [$expr2, $expr3, $expr4, $expr5, SplitPart($expr6, '/':Varchar, 4:Int32) as $expr7, SplitPart($expr6, '/':Varchar, 5:Int32) as $expr8, SplitPart($expr6, '/':Varchar, 6:Int32) as $expr9] }
└─BatchProject { exprs: [Field(bid, 0:Int32) as $expr2, Field(bid, 1:Int32) as $expr3, Field(bid, 2:Int32) as $expr4, Field(bid, 3:Int32) as $expr5, Field(bid, 4:Int32) as $expr6] }
└─BatchProject { exprs: [$expr1, $expr2, $expr3, $expr4, SplitPart($expr5, '/':Varchar, 4:Int32) as $expr6, SplitPart($expr5, '/':Varchar, 5:Int32) as $expr7, SplitPart($expr5, '/':Varchar, 6:Int32) as $expr8] }
└─BatchProject { exprs: [Field(bid, 0:Int32) as $expr1, Field(bid, 1:Int32) as $expr2, Field(bid, 2:Int32) as $expr3, Field(bid, 3:Int32) as $expr4, Field(bid, 4:Int32) as $expr5] }
└─BatchFilter { predicate: (event_type = 2:Int32) }
└─BatchProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] }
└─BatchProject { exprs: [event_type, bid] }
└─BatchSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [auction, bidder, price, channel, dir1, dir2, dir3, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/planner_test/tests/testdata/output/share.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,12 @@
└─BatchHashJoin { type: Inner, predicate: id = id, output: [id, id, date_time, date_time] }
├─BatchExchange { order: [], dist: HashShard(id) }
│ └─BatchFilter { predicate: (initial_bid = 1:Int32) AND (AtTimeZone(date_time, 'UTC':Varchar) > '2021-03-31 23:59:59+00:00':Timestamptz) }
│ └─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id], filter: (None, None) }
│ └─BatchProject { exprs: [id, initial_bid, date_time] }
│ └─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id], filter: (None, None) }
└─BatchExchange { order: [], dist: HashShard(id) }
└─BatchFilter { predicate: (initial_bid = 2:Int32) }
└─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id], filter: (None, None) }
└─BatchProject { exprs: [id, initial_bid, date_time] }
└─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [a_id, b_id, a_ts, b_ts, _row_id(hidden), _row_id#1(hidden)], stream_key: [_row_id, _row_id#1, a_id], pk_columns: [_row_id, _row_id#1, a_id], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(id, _row_id, _row_id) }
Expand Down
Loading

0 comments on commit 7ebab64

Please sign in to comment.