From 901d9b0f54a10f9a5e26b895a6b765b98d27d007 Mon Sep 17 00:00:00 2001 From: Dylan Date: Wed, 8 Nov 2023 16:22:20 +0800 Subject: [PATCH] refactor(optimizer): add lookup table to batch lookup join explain (#13311) --- .../testdata/output/batch_index_join.yaml | 14 ++-- .../tests/testdata/output/ch_benchmark.yaml | 76 +++++++++--------- .../output/distributed_lookup_join.yaml | 8 +- .../testdata/output/distribution_derive.yaml | 10 +-- .../tests/testdata/output/except.yaml | 2 +- .../testdata/output/index_selection.yaml | 78 +++++++++---------- .../tests/testdata/output/intersect.yaml | 2 +- .../tests/testdata/output/join.yaml | 12 +-- .../tests/testdata/output/nexmark.yaml | 4 +- .../tests/testdata/output/subquery.yaml | 2 +- .../tests/testdata/output/time_window.yaml | 4 +- .../tests/testdata/output/tpch.yaml | 72 ++++++++--------- .../optimizer/plan_node/batch_lookup_join.rs | 7 +- 13 files changed, 148 insertions(+), 143 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/batch_index_join.yaml b/src/frontend/planner_test/tests/testdata/output/batch_index_join.yaml index 2d1b0951089e..72abd65027d0 100644 --- a/src/frontend/planner_test/tests/testdata/output/batch_index_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/batch_index_join.yaml @@ -6,7 +6,7 @@ select * from t join t2 on t.b = t2.d; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: t.b = idx.d, output: all } + └─BatchLookupJoin { type: Inner, predicate: t.b = idx.d, output: all, lookup table: idx } └─BatchExchange { order: [], dist: UpstreamHashShard(t.b) } └─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard } - sql: | @@ -18,7 +18,7 @@ select * from t join t2 on t.a = t2.c and t.b = t2.d; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: t.a = idx2.c AND t.b = idx2.d, output: all } + └─BatchLookupJoin { type: Inner, predicate: t.a = idx2.c AND t.b = idx2.d, output: all, lookup table: idx2 } └─BatchExchange { order: [], dist: UpstreamHashShard(t.a) } └─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard } - sql: | @@ -28,7 +28,7 @@ select * from t join t2 on t.b = t2.d; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: t.b = idx.d, output: all } + └─BatchLookupJoin { type: Inner, predicate: t.b = idx.d, output: all, lookup table: idx } └─BatchExchange { order: [], dist: UpstreamHashShard(t.b) } └─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard } - name: test index join predicate reorder @@ -39,7 +39,7 @@ select * from t join t2 on t.b = t2.d and t.a = t2.c; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: t.a = idx.c AND t.b = idx.d, output: all } + └─BatchLookupJoin { type: Inner, predicate: t.a = idx.c AND t.b = idx.d, output: all, lookup table: idx } └─BatchExchange { order: [], dist: UpstreamHashShard(t.a) } └─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard } - name: test index join prefix lookup @@ -50,7 +50,7 @@ select * from t join t2 on t.a = t2.c; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: t.a = idx.c, output: all } + └─BatchLookupJoin { type: Inner, predicate: t.a = idx.c, output: all, lookup table: idx } └─BatchExchange { order: [], dist: UpstreamHashShard(t.a) } └─BatchScan { table: t, columns: [t.a, t.b], distribution: SomeShard } - name: test index join distribution derive @@ -63,7 +63,7 @@ └─BatchHashAgg { group_key: [internal_last_seen_value(t2.c)], aggs: [internal_last_seen_value(internal_last_seen_value(t2.d)), count(t.a)] } └─BatchExchange { order: [], dist: HashShard(internal_last_seen_value(t2.c)) } └─BatchHashAgg { group_key: [t.a], aggs: [internal_last_seen_value(t2.c), internal_last_seen_value(t2.d)] } - └─BatchLookupJoin { type: Inner, predicate: t.a = t2.c, output: [t2.c, t2.d, t.a] } + └─BatchLookupJoin { type: Inner, predicate: t.a = t2.c, output: [t2.c, t2.d, t.a], lookup table: t2 } └─BatchExchange { order: [], dist: UpstreamHashShard(t.a) } └─BatchScan { table: t, columns: [t.a], distribution: SomeShard } - sql: | @@ -73,7 +73,7 @@ select * from t1 join idx on t1.a = idx.c and t1.b = idx.d; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: t1.a = idx.c AND t1.b = idx.d, output: all } + └─BatchLookupJoin { type: Inner, predicate: t1.a = idx.c AND t1.b = idx.d, output: all, lookup table: idx } └─BatchExchange { order: [], dist: UpstreamHashShard(t1.a) } └─BatchScan { table: t1, columns: [t1.a, t1.b], distribution: SomeShard } - name: shouldn't be a lookup join diff --git a/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml b/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml index 7eed762be53b..97e64b0867b8 100644 --- a/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml @@ -105,11 +105,11 @@ batch_plan: |- BatchExchange { order: [nation.n_name ASC, supplier.s_name ASC, item.i_id ASC], dist: Single } └─BatchSort { order: [nation.n_name ASC, supplier.s_name ASC, item.i_id ASC] } - └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name >= 'EUROP':Varchar) AND (region.r_name < 'EUROQ':Varchar), output: [supplier.s_suppkey, supplier.s_name, nation.n_name, item.i_id, item.i_name, supplier.s_address, supplier.s_phone, supplier.s_comment] } + └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name >= 'EUROP':Varchar) AND (region.r_name < 'EUROQ':Varchar), output: [supplier.s_suppkey, supplier.s_name, nation.n_name, item.i_id, item.i_name, supplier.s_address, supplier.s_phone, supplier.s_comment], lookup table: region } └─BatchExchange { order: [], dist: UpstreamHashShard(nation.n_regionkey) } - └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [item.i_id, item.i_name, supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_comment, nation.n_name, nation.n_regionkey] } + └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [item.i_id, item.i_name, supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_comment, nation.n_name, nation.n_regionkey], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } - └─BatchLookupJoin { type: Inner, predicate: $expr2 = supplier.s_suppkey, output: [item.i_id, item.i_name, supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_nationkey, supplier.s_phone, supplier.s_comment] } + └─BatchLookupJoin { type: Inner, predicate: $expr2 = supplier.s_suppkey, output: [item.i_id, item.i_name, supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_nationkey, supplier.s_phone, supplier.s_comment], lookup table: supplier } └─BatchExchange { order: [], dist: UpstreamHashShard($expr2) } └─BatchProject { exprs: [item.i_id, item.i_name, ((stock.s_w_id * stock.s_i_id) % 10000:Int32)::Int64 as $expr2] } └─BatchHashJoin { type: Inner, predicate: item.i_id = stock.s_i_id AND stock.s_quantity = min(stock.s_quantity), output: [item.i_id, item.i_name, stock.s_i_id, stock.s_w_id] } @@ -124,9 +124,9 @@ └─BatchExchange { order: [], dist: HashShard(stock.s_i_id) } └─BatchHashJoin { type: Inner, predicate: supplier.s_suppkey = $expr1, output: [stock.s_i_id, stock.s_quantity] } ├─BatchExchange { order: [], dist: HashShard(supplier.s_suppkey) } - │ └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name >= 'EUROP':Varchar) AND (region.r_name < 'EUROQ':Varchar), output: [supplier.s_suppkey] } + │ └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name >= 'EUROP':Varchar) AND (region.r_name < 'EUROQ':Varchar), output: [supplier.s_suppkey], lookup table: region } │ └─BatchExchange { order: [], dist: UpstreamHashShard(nation.n_regionkey) } - │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [supplier.s_suppkey, nation.n_regionkey] } + │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [supplier.s_suppkey, nation.n_regionkey], lookup table: nation } │ └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } │ └─BatchScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], distribution: UpstreamHashShard(supplier.s_suppkey) } └─BatchExchange { order: [], dist: HashShard($expr1) } @@ -399,7 +399,7 @@ └─BatchExchange { order: [], dist: HashShard(order_line.ol_o_id, order_line.ol_w_id, order_line.ol_d_id, orders.o_entry_d) } └─BatchHashJoin { type: Inner, predicate: orders.o_w_id = order_line.ol_w_id AND orders.o_d_id = order_line.ol_d_id AND orders.o_id = order_line.ol_o_id, output: [order_line.ol_o_id, order_line.ol_w_id, order_line.ol_d_id, orders.o_entry_d, order_line.ol_amount] } ├─BatchExchange { order: [], dist: HashShard(orders.o_id, orders.o_d_id, orders.o_w_id) } - │ └─BatchLookupJoin { type: Inner, predicate: orders.o_w_id = new_order.no_w_id AND orders.o_d_id = new_order.no_d_id AND orders.o_id = new_order.no_o_id, output: [orders.o_id, orders.o_d_id, orders.o_w_id, orders.o_entry_d] } + │ └─BatchLookupJoin { type: Inner, predicate: orders.o_w_id = new_order.no_w_id AND orders.o_d_id = new_order.no_d_id AND orders.o_id = new_order.no_o_id, output: [orders.o_id, orders.o_d_id, orders.o_w_id, orders.o_entry_d], lookup table: new_order } │ └─BatchExchange { order: [], dist: UpstreamHashShard(orders.o_id, orders.o_d_id, orders.o_w_id) } │ └─BatchHashJoin { type: Inner, predicate: customer.c_id = orders.o_c_id AND customer.c_w_id = orders.o_w_id AND customer.c_d_id = orders.o_d_id, output: [orders.o_id, orders.o_d_id, orders.o_w_id, orders.o_entry_d] } │ ├─BatchExchange { order: [], dist: HashShard(customer.c_d_id, customer.c_w_id, customer.c_id) } @@ -654,14 +654,14 @@ BatchExchange { order: [], dist: Single } └─BatchHashAgg { group_key: [nation.n_name], aggs: [sum(order_line.ol_amount)] } └─BatchExchange { order: [], dist: HashShard(nation.n_name) } - └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name = 'EUROPE':Varchar), output: [nation.n_name, order_line.ol_amount] } + └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name = 'EUROPE':Varchar), output: [nation.n_name, order_line.ol_amount], lookup table: region } └─BatchExchange { order: [], dist: UpstreamHashShard(nation.n_regionkey) } - └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [order_line.ol_amount, nation.n_name, nation.n_regionkey] } + └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [order_line.ol_amount, nation.n_name, nation.n_regionkey], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } - └─BatchLookupJoin { type: Inner, predicate: $expr1 = supplier.s_suppkey AND $expr2 = supplier.s_nationkey, output: [order_line.ol_amount, supplier.s_nationkey] } + └─BatchLookupJoin { type: Inner, predicate: $expr1 = supplier.s_suppkey AND $expr2 = supplier.s_nationkey, output: [order_line.ol_amount, supplier.s_nationkey], lookup table: supplier } └─BatchExchange { order: [], dist: UpstreamHashShard($expr1) } └─BatchProject { exprs: [order_line.ol_amount, ((stock.s_w_id * stock.s_i_id) % 10000:Int32)::Int64 as $expr1, (Ascii(Substr(customer.c_state, 1:Int32, 1:Int32)) - 65:Int32)::Int64 as $expr2] } - └─BatchLookupJoin { type: Inner, predicate: order_line.ol_w_id = stock.s_w_id AND order_line.ol_i_id = stock.s_i_id, output: [customer.c_state, order_line.ol_amount, stock.s_i_id, stock.s_w_id] } + └─BatchLookupJoin { type: Inner, predicate: order_line.ol_w_id = stock.s_w_id AND order_line.ol_i_id = stock.s_i_id, output: [customer.c_state, order_line.ol_amount, stock.s_i_id, stock.s_w_id], lookup table: stock } └─BatchExchange { order: [], dist: UpstreamHashShard(order_line.ol_i_id, order_line.ol_w_id) } └─BatchHashJoin { type: Inner, predicate: orders.o_id = order_line.ol_o_id AND orders.o_w_id = order_line.ol_w_id AND orders.o_d_id = order_line.ol_d_id, output: [customer.c_state, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_amount] } ├─BatchExchange { order: [], dist: HashShard(orders.o_id, orders.o_d_id, orders.o_w_id) } @@ -971,17 +971,17 @@ └─BatchHashAgg { group_key: [supplier.s_nationkey, $expr3, $expr4], aggs: [sum(order_line.ol_amount)] } └─BatchExchange { order: [], dist: HashShard(supplier.s_nationkey, $expr3, $expr4) } └─BatchProject { exprs: [supplier.s_nationkey, Substr(customer.c_state, 1:Int32, 1:Int32) as $expr3, Extract('YEAR':Varchar, orders.o_entry_d) as $expr4, order_line.ol_amount] } - └─BatchLookupJoin { type: Inner, predicate: $expr2 = nation.n_nationkey AND (((nation.n_name = 'JAPAN':Varchar) AND (nation.n_name = 'CHINA':Varchar)) OR ((nation.n_name = 'CHINA':Varchar) AND (nation.n_name = 'JAPAN':Varchar))), output: [supplier.s_nationkey, order_line.ol_amount, orders.o_entry_d, customer.c_state] } + └─BatchLookupJoin { type: Inner, predicate: $expr2 = nation.n_nationkey AND (((nation.n_name = 'JAPAN':Varchar) AND (nation.n_name = 'CHINA':Varchar)) OR ((nation.n_name = 'CHINA':Varchar) AND (nation.n_name = 'JAPAN':Varchar))), output: [supplier.s_nationkey, order_line.ol_amount, orders.o_entry_d, customer.c_state], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard($expr2) } └─BatchProject { exprs: [order_line.ol_amount, orders.o_entry_d, customer.c_state, supplier.s_nationkey, nation.n_name, (Ascii(Substr(customer.c_state, 1:Int32, 1:Int32)) - 65:Int32)::Int64 as $expr2] } - └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [order_line.ol_amount, orders.o_entry_d, customer.c_state, supplier.s_nationkey, nation.n_name] } + └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [order_line.ol_amount, orders.o_entry_d, customer.c_state, supplier.s_nationkey, nation.n_name], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } - └─BatchLookupJoin { type: Inner, predicate: $expr1 = supplier.s_suppkey, output: [order_line.ol_amount, orders.o_entry_d, customer.c_state, supplier.s_nationkey] } + └─BatchLookupJoin { type: Inner, predicate: $expr1 = supplier.s_suppkey, output: [order_line.ol_amount, orders.o_entry_d, customer.c_state, supplier.s_nationkey], lookup table: supplier } └─BatchExchange { order: [], dist: UpstreamHashShard($expr1) } └─BatchProject { exprs: [order_line.ol_amount, orders.o_entry_d, customer.c_state, ((stock.s_w_id * stock.s_i_id) % 10000:Int32)::Int64 as $expr1] } - └─BatchLookupJoin { type: Inner, predicate: orders.o_w_id = customer.c_w_id AND orders.o_d_id = customer.c_d_id AND orders.o_c_id = customer.c_id, output: [stock.s_i_id, stock.s_w_id, order_line.ol_amount, orders.o_entry_d, customer.c_state] } + └─BatchLookupJoin { type: Inner, predicate: orders.o_w_id = customer.c_w_id AND orders.o_d_id = customer.c_d_id AND orders.o_c_id = customer.c_id, output: [stock.s_i_id, stock.s_w_id, order_line.ol_amount, orders.o_entry_d, customer.c_state], lookup table: customer } └─BatchExchange { order: [], dist: UpstreamHashShard(orders.o_c_id, orders.o_d_id, orders.o_w_id) } - └─BatchLookupJoin { type: Inner, predicate: order_line.ol_w_id = orders.o_w_id AND order_line.ol_d_id = orders.o_d_id AND order_line.ol_o_id = orders.o_id, output: [stock.s_i_id, stock.s_w_id, order_line.ol_amount, orders.o_d_id, orders.o_w_id, orders.o_c_id, orders.o_entry_d] } + └─BatchLookupJoin { type: Inner, predicate: order_line.ol_w_id = orders.o_w_id AND order_line.ol_d_id = orders.o_d_id AND order_line.ol_o_id = orders.o_id, output: [stock.s_i_id, stock.s_w_id, order_line.ol_amount, orders.o_d_id, orders.o_w_id, orders.o_c_id, orders.o_entry_d], lookup table: orders } └─BatchExchange { order: [], dist: UpstreamHashShard(order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id) } └─BatchHashJoin { type: Inner, predicate: stock.s_w_id = order_line.ol_supply_w_id AND stock.s_i_id = order_line.ol_i_id, output: [stock.s_i_id, stock.s_w_id, order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_amount] } ├─BatchExchange { order: [], dist: HashShard(stock.s_i_id, stock.s_w_id) } @@ -1235,19 +1235,19 @@ └─BatchHashAgg { group_key: [$expr3], aggs: [sum($expr4), sum(order_line.ol_amount)] } └─BatchExchange { order: [], dist: HashShard($expr3) } └─BatchProject { exprs: [Extract('YEAR':Varchar, orders.o_entry_d) as $expr3, Case((nation.n_name = 'INDIA':Varchar), order_line.ol_amount, 0:Decimal) as $expr4, order_line.ol_amount] } - └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name = 'ASIA':Varchar), output: [order_line.ol_amount, orders.o_entry_d, nation.n_name] } + └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name = 'ASIA':Varchar), output: [order_line.ol_amount, orders.o_entry_d, nation.n_name], lookup table: region } └─BatchExchange { order: [], dist: UpstreamHashShard(nation.n_regionkey) } - └─BatchLookupJoin { type: Inner, predicate: $expr2 = nation.n_nationkey, output: [order_line.ol_amount, orders.o_entry_d, nation.n_name, nation.n_regionkey] } + └─BatchLookupJoin { type: Inner, predicate: $expr2 = nation.n_nationkey, output: [order_line.ol_amount, orders.o_entry_d, nation.n_name, nation.n_regionkey], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard($expr2) } └─BatchProject { exprs: [order_line.ol_amount, orders.o_entry_d, nation.n_name, (Ascii(Substr(customer.c_state, 1:Int32, 1:Int32)) - 65:Int32)::Int64 as $expr2] } - └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [order_line.ol_amount, orders.o_entry_d, customer.c_state, nation.n_name] } + └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [order_line.ol_amount, orders.o_entry_d, customer.c_state, nation.n_name], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } - └─BatchLookupJoin { type: Inner, predicate: $expr1 = supplier.s_suppkey, output: [order_line.ol_amount, orders.o_entry_d, customer.c_state, supplier.s_nationkey] } + └─BatchLookupJoin { type: Inner, predicate: $expr1 = supplier.s_suppkey, output: [order_line.ol_amount, orders.o_entry_d, customer.c_state, supplier.s_nationkey], lookup table: supplier } └─BatchExchange { order: [], dist: UpstreamHashShard($expr1) } └─BatchProject { exprs: [order_line.ol_amount, orders.o_entry_d, customer.c_state, ((stock.s_w_id * stock.s_i_id) % 10000:Int32)::Int64 as $expr1] } - └─BatchLookupJoin { type: Inner, predicate: orders.o_w_id = customer.c_w_id AND orders.o_d_id = customer.c_d_id AND orders.o_c_id = customer.c_id, output: [stock.s_i_id, stock.s_w_id, order_line.ol_amount, orders.o_entry_d, customer.c_state] } + └─BatchLookupJoin { type: Inner, predicate: orders.o_w_id = customer.c_w_id AND orders.o_d_id = customer.c_d_id AND orders.o_c_id = customer.c_id, output: [stock.s_i_id, stock.s_w_id, order_line.ol_amount, orders.o_entry_d, customer.c_state], lookup table: customer } └─BatchExchange { order: [], dist: UpstreamHashShard(orders.o_c_id, orders.o_d_id, orders.o_w_id) } - └─BatchLookupJoin { type: Inner, predicate: order_line.ol_w_id = orders.o_w_id AND order_line.ol_d_id = orders.o_d_id AND order_line.ol_o_id = orders.o_id AND (orders.o_entry_d >= '2007-01-02 00:00:00':Timestamp) AND (orders.o_entry_d <= '2032-01-02 00:00:00':Timestamp), output: [stock.s_i_id, stock.s_w_id, order_line.ol_amount, orders.o_d_id, orders.o_w_id, orders.o_c_id, orders.o_entry_d] } + └─BatchLookupJoin { type: Inner, predicate: order_line.ol_w_id = orders.o_w_id AND order_line.ol_d_id = orders.o_d_id AND order_line.ol_o_id = orders.o_id AND (orders.o_entry_d >= '2007-01-02 00:00:00':Timestamp) AND (orders.o_entry_d <= '2032-01-02 00:00:00':Timestamp), output: [stock.s_i_id, stock.s_w_id, order_line.ol_amount, orders.o_d_id, orders.o_w_id, orders.o_c_id, orders.o_entry_d], lookup table: orders } └─BatchExchange { order: [], dist: UpstreamHashShard(order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id) } └─BatchHashJoin { type: Inner, predicate: stock.s_i_id = order_line.ol_i_id AND stock.s_w_id = order_line.ol_supply_w_id AND item.i_id = order_line.ol_i_id, output: [stock.s_i_id, stock.s_w_id, order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_amount] } ├─BatchExchange { order: [], dist: HashShard(item.i_id, stock.s_w_id) } @@ -1556,14 +1556,14 @@ └─BatchHashAgg { group_key: [nation.n_name, $expr2], aggs: [sum(order_line.ol_amount)] } └─BatchExchange { order: [], dist: HashShard(nation.n_name, $expr2) } └─BatchProject { exprs: [nation.n_name, Extract('YEAR':Varchar, orders.o_entry_d) as $expr2, order_line.ol_amount] } - └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [order_line.ol_amount, orders.o_entry_d, nation.n_name] } + └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [order_line.ol_amount, orders.o_entry_d, nation.n_name], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } - └─BatchLookupJoin { type: Inner, predicate: $expr1 = supplier.s_suppkey, output: [order_line.ol_amount, orders.o_entry_d, supplier.s_nationkey] } + └─BatchLookupJoin { type: Inner, predicate: $expr1 = supplier.s_suppkey, output: [order_line.ol_amount, orders.o_entry_d, supplier.s_nationkey], lookup table: supplier } └─BatchExchange { order: [], dist: UpstreamHashShard($expr1) } └─BatchProject { exprs: [order_line.ol_amount, orders.o_entry_d, ((stock.s_w_id * stock.s_i_id) % 10000:Int32)::Int64 as $expr1] } - └─BatchLookupJoin { type: Inner, predicate: order_line.ol_w_id = orders.o_w_id AND order_line.ol_d_id = orders.o_d_id AND order_line.ol_o_id = orders.o_id, output: [order_line.ol_amount, stock.s_i_id, stock.s_w_id, orders.o_entry_d] } + └─BatchLookupJoin { type: Inner, predicate: order_line.ol_w_id = orders.o_w_id AND order_line.ol_d_id = orders.o_d_id AND order_line.ol_o_id = orders.o_id, output: [order_line.ol_amount, stock.s_i_id, stock.s_w_id, orders.o_entry_d], lookup table: orders } └─BatchExchange { order: [], dist: UpstreamHashShard(order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id) } - └─BatchLookupJoin { type: Inner, predicate: order_line.ol_supply_w_id = stock.s_w_id AND order_line.ol_i_id = stock.s_i_id, output: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_amount, stock.s_i_id, stock.s_w_id] } + └─BatchLookupJoin { type: Inner, predicate: order_line.ol_supply_w_id = stock.s_w_id AND order_line.ol_i_id = stock.s_i_id, output: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_amount, stock.s_i_id, stock.s_w_id], lookup table: stock } └─BatchExchange { order: [], dist: UpstreamHashShard(order_line.ol_i_id, order_line.ol_supply_w_id) } └─BatchHashJoin { type: Inner, predicate: item.i_id = order_line.ol_i_id, output: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_supply_w_id, order_line.ol_amount] } ├─BatchExchange { order: [], dist: HashShard(item.i_id) } @@ -1781,7 +1781,7 @@ └─BatchSort { order: [sum(order_line.ol_amount) DESC] } └─BatchHashAgg { group_key: [customer.c_id, customer.c_last, customer.c_city, customer.c_phone, nation.n_name], aggs: [sum(order_line.ol_amount)] } └─BatchExchange { order: [], dist: HashShard(customer.c_id, customer.c_last, customer.c_city, customer.c_phone, nation.n_name) } - └─BatchLookupJoin { type: Inner, predicate: $expr1 = nation.n_nationkey, output: [customer.c_id, customer.c_last, customer.c_city, customer.c_phone, nation.n_name, order_line.ol_amount] } + └─BatchLookupJoin { type: Inner, predicate: $expr1 = nation.n_nationkey, output: [customer.c_id, customer.c_last, customer.c_city, customer.c_phone, nation.n_name, order_line.ol_amount], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard($expr1) } └─BatchProject { exprs: [customer.c_id, customer.c_last, customer.c_city, customer.c_phone, order_line.ol_amount, (Ascii(Substr(customer.c_state, 1:Int32, 1:Int32)) - 65:Int32)::Int64 as $expr1] } └─BatchHashJoin { type: Inner, predicate: orders.o_w_id = order_line.ol_w_id AND orders.o_d_id = order_line.ol_d_id AND orders.o_id = order_line.ol_o_id AND (orders.o_entry_d <= order_line.ol_delivery_d), output: [customer.c_id, customer.c_last, customer.c_city, customer.c_state, customer.c_phone, order_line.ol_amount] } @@ -1972,7 +1972,7 @@ │ └─BatchExchange { order: [], dist: HashShard(stock.s_i_id) } │ └─BatchHashJoin { type: Inner, predicate: supplier.s_suppkey = $expr1, output: [stock.s_i_id, stock.s_order_cnt] } │ ├─BatchExchange { order: [], dist: HashShard(supplier.s_suppkey) } - │ │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'CHINA':Varchar), output: [supplier.s_suppkey] } + │ │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'CHINA':Varchar), output: [supplier.s_suppkey], lookup table: nation } │ │ └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } │ │ └─BatchScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], distribution: UpstreamHashShard(supplier.s_suppkey) } │ └─BatchExchange { order: [], dist: HashShard($expr1) } @@ -1984,7 +1984,7 @@ └─BatchSimpleAgg { aggs: [sum(stock.s_order_cnt)] } └─BatchHashJoin { type: Inner, predicate: supplier.s_suppkey = $expr3, output: [stock.s_order_cnt] } ├─BatchExchange { order: [], dist: HashShard(supplier.s_suppkey) } - │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'CHINA':Varchar), output: [supplier.s_suppkey] } + │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'CHINA':Varchar), output: [supplier.s_suppkey], lookup table: nation } │ └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } │ └─BatchScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], distribution: UpstreamHashShard(supplier.s_suppkey) } └─BatchExchange { order: [], dist: HashShard($expr3) } @@ -2396,7 +2396,7 @@ └─BatchExchange { order: [], dist: Single } └─BatchSimpleAgg { aggs: [sum($expr1), sum(order_line.ol_amount)] } └─BatchProject { exprs: [Case(Like(item.i_data, 'PR%':Varchar), order_line.ol_amount, 0:Decimal) as $expr1, order_line.ol_amount] } - └─BatchLookupJoin { type: Inner, predicate: order_line.ol_i_id = item.i_id, output: [order_line.ol_amount, item.i_data] } + └─BatchLookupJoin { type: Inner, predicate: order_line.ol_i_id = item.i_id, output: [order_line.ol_amount, item.i_data], lookup table: item } └─BatchExchange { order: [], dist: UpstreamHashShard(order_line.ol_i_id) } └─BatchProject { exprs: [order_line.ol_i_id, order_line.ol_amount] } └─BatchFilter { predicate: (order_line.ol_delivery_d >= '2007-01-02 00:00:00':Timestamp) AND (order_line.ol_delivery_d < '2030-01-02 00:00:00':Timestamp) } @@ -2625,10 +2625,10 @@ └─BatchHashAgg { group_key: [item.i_name, $expr2, item.i_price, $expr3], aggs: [] } └─BatchExchange { order: [], dist: HashShard(item.i_name, $expr2, item.i_price, $expr3) } └─BatchProject { exprs: [item.i_name, Substr(item.i_data, 1:Int32, 3:Int32) as $expr2, item.i_price, ((stock.s_w_id * stock.s_i_id) % 10000:Int32) as $expr3] } - └─BatchLookupJoin { type: LeftAnti, predicate: $expr1 = supplier.s_suppkey AND Like(supplier.s_comment, '%bad%':Varchar), output: [stock.s_i_id, stock.s_w_id, item.i_name, item.i_price, item.i_data] } + └─BatchLookupJoin { type: LeftAnti, predicate: $expr1 = supplier.s_suppkey AND Like(supplier.s_comment, '%bad%':Varchar), output: [stock.s_i_id, stock.s_w_id, item.i_name, item.i_price, item.i_data], lookup table: supplier } └─BatchExchange { order: [], dist: UpstreamHashShard($expr1) } └─BatchProject { exprs: [stock.s_i_id, stock.s_w_id, item.i_name, item.i_price, item.i_data, ((stock.s_w_id * stock.s_i_id) % 10000:Int32)::Int64 as $expr1] } - └─BatchLookupJoin { type: Inner, predicate: stock.s_i_id = item.i_id AND (Not((item.i_data >= 'zz':Varchar)) OR Not((item.i_data < 'z{':Varchar))), output: [stock.s_i_id, stock.s_w_id, item.i_name, item.i_price, item.i_data] } + └─BatchLookupJoin { type: Inner, predicate: stock.s_i_id = item.i_id AND (Not((item.i_data >= 'zz':Varchar)) OR Not((item.i_data < 'z{':Varchar))), output: [stock.s_i_id, stock.s_w_id, item.i_name, item.i_price, item.i_data], lookup table: item } └─BatchExchange { order: [], dist: UpstreamHashShard(stock.s_i_id) } └─BatchScan { table: stock, columns: [stock.s_i_id, stock.s_w_id], distribution: UpstreamHashShard(stock.s_i_id, stock.s_w_id) } stream_plan: |- @@ -3042,7 +3042,7 @@ BatchSimpleAgg { aggs: [sum(sum(order_line.ol_amount))] } └─BatchExchange { order: [], dist: Single } └─BatchSimpleAgg { aggs: [sum(order_line.ol_amount)] } - └─BatchLookupJoin { type: Inner, predicate: order_line.ol_i_id = item.i_id AND (((Like(item.i_data, '%a':Varchar) AND In(order_line.ol_w_id, 1:Int32, 2:Int32, 3:Int32)) OR (Like(item.i_data, '%b':Varchar) AND In(order_line.ol_w_id, 1:Int32, 2:Int32, 4:Int32))) OR (Like(item.i_data, '%c':Varchar) AND In(order_line.ol_w_id, 1:Int32, 5:Int32, 3:Int32))) AND (item.i_price >= 1:Decimal) AND (item.i_price <= 400000:Decimal), output: [order_line.ol_amount] } + └─BatchLookupJoin { type: Inner, predicate: order_line.ol_i_id = item.i_id AND (((Like(item.i_data, '%a':Varchar) AND In(order_line.ol_w_id, 1:Int32, 2:Int32, 3:Int32)) OR (Like(item.i_data, '%b':Varchar) AND In(order_line.ol_w_id, 1:Int32, 2:Int32, 4:Int32))) OR (Like(item.i_data, '%c':Varchar) AND In(order_line.ol_w_id, 1:Int32, 5:Int32, 3:Int32))) AND (item.i_price >= 1:Decimal) AND (item.i_price <= 400000:Decimal), output: [order_line.ol_amount], lookup table: item } └─BatchExchange { order: [], dist: UpstreamHashShard(order_line.ol_i_id) } └─BatchProject { exprs: [order_line.ol_w_id, order_line.ol_i_id, order_line.ol_amount] } └─BatchFilter { predicate: (order_line.ol_quantity >= 1:Int32) AND (order_line.ol_quantity <= 10:Int32) } @@ -3155,14 +3155,14 @@ └─BatchSort { order: [supplier.s_name ASC] } └─BatchHashJoin { type: LeftSemi, predicate: supplier.s_suppkey = $expr1, output: [supplier.s_name, supplier.s_address] } ├─BatchExchange { order: [], dist: HashShard(supplier.s_suppkey) } - │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'CHINA':Varchar), output: [supplier.s_suppkey, supplier.s_name, supplier.s_address] } + │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'CHINA':Varchar), output: [supplier.s_suppkey, supplier.s_name, supplier.s_address], lookup table: nation } │ └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } │ └─BatchScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_nationkey], distribution: UpstreamHashShard(supplier.s_suppkey) } └─BatchExchange { order: [], dist: HashShard($expr1) } └─BatchProject { exprs: [((stock.s_i_id * stock.s_w_id) % 10000:Int32)::Int64 as $expr1] } └─BatchFilter { predicate: ((2:Int32 * stock.s_quantity) > sum(order_line.ol_quantity)) } └─BatchHashAgg { group_key: [stock.s_i_id, stock.s_w_id, stock.s_quantity], aggs: [sum(order_line.ol_quantity)] } - └─BatchLookupJoin { type: LeftSemi, predicate: stock.s_i_id = item.i_id AND (item.i_data >= 'co':Varchar) AND (item.i_data < 'cp':Varchar), output: all } + └─BatchLookupJoin { type: LeftSemi, predicate: stock.s_i_id = item.i_id AND (item.i_data >= 'co':Varchar) AND (item.i_data < 'cp':Varchar), output: all, lookup table: item } └─BatchExchange { order: [], dist: UpstreamHashShard(stock.s_i_id) } └─BatchHashJoin { type: Inner, predicate: stock.s_i_id = order_line.ol_i_id, output: [stock.s_i_id, stock.s_w_id, stock.s_quantity, order_line.ol_quantity] } ├─BatchExchange { order: [], dist: HashShard(stock.s_i_id) } @@ -3353,14 +3353,14 @@ └─BatchExchange { order: [], dist: HashShard(supplier.s_name) } └─BatchHashJoin { type: LeftAnti, predicate: order_line.ol_o_id = order_line.ol_o_id AND order_line.ol_w_id = order_line.ol_w_id AND order_line.ol_d_id = order_line.ol_d_id AND (order_line.ol_delivery_d > order_line.ol_delivery_d), output: [supplier.s_name] } ├─BatchExchange { order: [], dist: HashShard(order_line.ol_o_id, order_line.ol_w_id, order_line.ol_d_id) } - │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'CHINA':Varchar), output: [supplier.s_name, order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_delivery_d] } + │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'CHINA':Varchar), output: [supplier.s_name, order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_delivery_d], lookup table: nation } │ └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } - │ └─BatchLookupJoin { type: Inner, predicate: $expr1 = supplier.s_suppkey, output: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_delivery_d, supplier.s_name, supplier.s_nationkey] } + │ └─BatchLookupJoin { type: Inner, predicate: $expr1 = supplier.s_suppkey, output: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_delivery_d, supplier.s_name, supplier.s_nationkey], lookup table: supplier } │ └─BatchExchange { order: [], dist: UpstreamHashShard($expr1) } │ └─BatchProject { exprs: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_delivery_d, ((stock.s_w_id * stock.s_i_id) % 10000:Int32)::Int64 as $expr1] } - │ └─BatchLookupJoin { type: Inner, predicate: order_line.ol_w_id = stock.s_w_id AND order_line.ol_i_id = stock.s_i_id, output: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_delivery_d, stock.s_i_id, stock.s_w_id] } + │ └─BatchLookupJoin { type: Inner, predicate: order_line.ol_w_id = stock.s_w_id AND order_line.ol_i_id = stock.s_i_id, output: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_delivery_d, stock.s_i_id, stock.s_w_id], lookup table: stock } │ └─BatchExchange { order: [], dist: UpstreamHashShard(order_line.ol_i_id, order_line.ol_w_id) } - │ └─BatchLookupJoin { type: Inner, predicate: order_line.ol_w_id = orders.o_w_id AND order_line.ol_d_id = orders.o_d_id AND order_line.ol_o_id = orders.o_id AND (order_line.ol_delivery_d > orders.o_entry_d), output: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_delivery_d] } + │ └─BatchLookupJoin { type: Inner, predicate: order_line.ol_w_id = orders.o_w_id AND order_line.ol_d_id = orders.o_d_id AND order_line.ol_o_id = orders.o_id AND (order_line.ol_delivery_d > orders.o_entry_d), output: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_delivery_d], lookup table: orders } │ └─BatchExchange { order: [], dist: UpstreamHashShard(order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id) } │ └─BatchScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_delivery_d], distribution: SomeShard } └─BatchExchange { order: [], dist: HashShard(order_line.ol_o_id, order_line.ol_w_id, order_line.ol_d_id) } diff --git a/src/frontend/planner_test/tests/testdata/output/distributed_lookup_join.yaml b/src/frontend/planner_test/tests/testdata/output/distributed_lookup_join.yaml index 051988be51a9..a67b123ae18e 100644 --- a/src/frontend/planner_test/tests/testdata/output/distributed_lookup_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/distributed_lookup_join.yaml @@ -7,7 +7,7 @@ select * from t1 natural join t2; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: t1.a = idx.a AND t1.b = idx.b AND t1.c = idx.c, output: [t1.c, t1.b, t1.a] } + └─BatchLookupJoin { type: Inner, predicate: t1.a = idx.a AND t1.b = idx.b AND t1.c = idx.c, output: [t1.c, t1.b, t1.a], lookup table: idx } └─BatchExchange { order: [], dist: UpstreamHashShard(t1.a) } └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: SomeShard } - id: fix https://github.com/risingwavelabs/risingwave/issues/10721 @@ -18,7 +18,7 @@ select * from t1 natural join t2; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: t1.b = idx.b AND t1.a = idx.a AND t1.c = idx.c, output: [t1.c, t1.b, t1.a] } + └─BatchLookupJoin { type: Inner, predicate: t1.b = idx.b AND t1.a = idx.a AND t1.c = idx.c, output: [t1.c, t1.b, t1.a], lookup table: idx } └─BatchExchange { order: [], dist: UpstreamHashShard(t1.b) } └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: SomeShard } - id: fix https://github.com/risingwavelabs/risingwave/issues/10721 @@ -29,7 +29,7 @@ select * from t1 natural join t2; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: t1.c = idx.c AND t1.b = idx.b AND t1.a = idx.a, output: [t1.c, t1.b, t1.a] } + └─BatchLookupJoin { type: Inner, predicate: t1.c = idx.c AND t1.b = idx.b AND t1.a = idx.a, output: [t1.c, t1.b, t1.a], lookup table: idx } └─BatchExchange { order: [], dist: UpstreamHashShard(t1.c, t1.b) } └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: SomeShard } - id: fix https://github.com/risingwavelabs/risingwave/issues/10721 @@ -40,6 +40,6 @@ select * from t1 natural join t2; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: t1.c = idx.c AND t1.b = idx.b AND t1.a = idx.a, output: [t1.c, t1.b, t1.a] } + └─BatchLookupJoin { type: Inner, predicate: t1.c = idx.c AND t1.b = idx.b AND t1.a = idx.a, output: [t1.c, t1.b, t1.a], lookup table: idx } └─BatchExchange { order: [], dist: UpstreamHashShard(t1.c, t1.b, t1.a) } └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: SomeShard } diff --git a/src/frontend/planner_test/tests/testdata/output/distribution_derive.yaml b/src/frontend/planner_test/tests/testdata/output/distribution_derive.yaml index 97d77873ff90..63ee19f4e147 100644 --- a/src/frontend/planner_test/tests/testdata/output/distribution_derive.yaml +++ b/src/frontend/planner_test/tests/testdata/output/distribution_derive.yaml @@ -13,7 +13,7 @@ sql: select A.v, B.v as Bv from A join B using(k1); batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: a.k1 = bk1.k1, output: [a.v, bk1.v] } + └─BatchLookupJoin { type: Inner, predicate: a.k1 = bk1.k1, output: [a.v, bk1.v], lookup table: bk1 } └─BatchExchange { order: [], dist: UpstreamHashShard(a.k1) } └─BatchScan { table: a, columns: [a.k1, a.v], distribution: SomeShard } stream_plan: |- @@ -65,7 +65,7 @@ sql: select A.v, B.v as Bv from Ak1 as A join B using(k1) batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v] } + └─BatchLookupJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v], lookup table: bk1 } └─BatchExchange { order: [], dist: UpstreamHashShard(ak1.k1) } └─BatchScan { table: ak1, columns: [ak1.k1, ak1.v], distribution: UpstreamHashShard(ak1.k1) } stream_plan: |- @@ -117,7 +117,7 @@ sql: select A.v, B.v as Bv from A join Bk1 as B using(k1) batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: a.k1 = bk1.k1, output: [a.v, bk1.v] } + └─BatchLookupJoin { type: Inner, predicate: a.k1 = bk1.k1, output: [a.v, bk1.v], lookup table: bk1 } └─BatchExchange { order: [], dist: UpstreamHashShard(a.k1) } └─BatchScan { table: a, columns: [a.k1, a.v], distribution: SomeShard } stream_plan: |- @@ -169,7 +169,7 @@ sql: select A.v, B.v as Bv from Ak1 as A join Bk1 as B using(k1) batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v] } + └─BatchLookupJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v], lookup table: bk1 } └─BatchExchange { order: [], dist: UpstreamHashShard(ak1.k1) } └─BatchScan { table: ak1, columns: [ak1.k1, ak1.v], distribution: UpstreamHashShard(ak1.k1) } stream_plan: |- @@ -1038,7 +1038,7 @@ └─LogicalScan { table: ak1, columns: [ak1.k1, ak1.k2, ak1.k3, ak1.v, ak1.a._row_id] } batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: a.k1 = ak1.k1, output: [ak1.v, count] } + └─BatchLookupJoin { type: Inner, predicate: a.k1 = ak1.k1, output: [ak1.v, count], lookup table: ak1 } └─BatchExchange { order: [], dist: UpstreamHashShard(a.k1) } └─BatchHashAgg { group_key: [a.k1], aggs: [count] } └─BatchExchange { order: [], dist: HashShard(a.k1) } diff --git a/src/frontend/planner_test/tests/testdata/output/except.yaml b/src/frontend/planner_test/tests/testdata/output/except.yaml index a98c834a1998..6c1b2c498ce4 100644 --- a/src/frontend/planner_test/tests/testdata/output/except.yaml +++ b/src/frontend/planner_test/tests/testdata/output/except.yaml @@ -115,7 +115,7 @@ batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchHashAgg { group_key: [t1.a], aggs: [internal_last_seen_value(t1.b), internal_last_seen_value(t1.c)] } - └─BatchLookupJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } + └─BatchLookupJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all, lookup table: t2 } └─BatchExchange { order: [], dist: UpstreamHashShard(t1.a) } └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: UpstreamHashShard(t1.a) } stream_plan: |- diff --git a/src/frontend/planner_test/tests/testdata/output/index_selection.yaml b/src/frontend/planner_test/tests/testdata/output/index_selection.yaml index affc346bda24..d5a270cdaba7 100644 --- a/src/frontend/planner_test/tests/testdata/output/index_selection.yaml +++ b/src/frontend/planner_test/tests/testdata/output/index_selection.yaml @@ -58,7 +58,7 @@ select * from t1 where b = 1 batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c] } + └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx2.t1._row_id) } └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(1))], distribution: SomeShard } - sql: | @@ -135,11 +135,11 @@ select * from t1 where c = 1 and a < 10; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx3.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (t1.a < 10:Int32), output: [t1.a, t1.b, t1.c] } + └─BatchLookupJoin { type: Inner, predicate: idx3.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (t1.a < 10:Int32), output: [t1.a, t1.b, t1.c], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx3.t1._row_id) } └─BatchScan { table: idx3, columns: [idx3.t1._row_id], scan_ranges: [idx3.c = Int64(1)], distribution: SomeShard } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx3.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (t1.a < 10:Int32), output: [t1.a, t1.b, t1.c] } + BatchLookupJoin { type: Inner, predicate: idx3.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (t1.a < 10:Int32), output: [t1.a, t1.b, t1.c], lookup table: t1 } └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx3, columns: [idx3.t1._row_id], scan_ranges: [idx3.c = Int64(1)], distribution: SomeShard } - sql: | @@ -150,11 +150,11 @@ select * from t1 where a = 1; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c] } + └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t1._row_id) } └─BatchScan { table: idx1, columns: [idx1.t1._row_id], scan_ranges: [idx1.a = Int32(1)], distribution: SomeShard } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c] } + BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c], lookup table: t1 } └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx1, columns: [idx1.t1._row_id], scan_ranges: [idx1.a = Int32(1)], distribution: SomeShard } - sql: | @@ -165,11 +165,11 @@ select * from t1 where a = 1 and b = 2; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c] } + └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx2.t1._row_id) } └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)) AND idx2.a = Int32(1)], distribution: SomeShard } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c] } + BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c], lookup table: t1 } └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)) AND idx2.a = Int32(1)], distribution: SomeShard } - sql: | @@ -180,11 +180,11 @@ select * from t1 where b = 2; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c] } + └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx2.t1._row_id) } └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: SomeShard } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c] } + BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c], lookup table: t1 } └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: SomeShard } - sql: | @@ -197,12 +197,12 @@ BatchExchange { order: [], dist: Single } └─BatchDelete { table: t1 } └─BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] } + └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx2.t1._row_id) } └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: SomeShard } batch_local_plan: |- BatchDelete { table: t1 } - └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] } + └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id], lookup table: t1 } └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: SomeShard } - sql: | @@ -215,12 +215,12 @@ BatchExchange { order: [], dist: Single } └─BatchUpdate { table: t1, exprs: [$0, $1, 3:Int64, $3] } └─BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] } + └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx2.t1._row_id) } └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)) AND idx2.a = Int32(1)], distribution: SomeShard } batch_local_plan: |- BatchUpdate { table: t1, exprs: [$0, $1, 3:Int64, $3] } - └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id] } + └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id], lookup table: t1 } └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)) AND idx2.a = Int32(1)], distribution: SomeShard } - sql: | @@ -230,7 +230,7 @@ select * from v where cnt = 1 or p = 2; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx1.v.p IS NOT DISTINCT FROM v.p AND ((v.cnt = 1:Int32) OR (v.p = 2:Int32)), output: [v.cnt, v.p] } + └─BatchLookupJoin { type: Inner, predicate: idx1.v.p IS NOT DISTINCT FROM v.p AND ((v.cnt = 1:Int32) OR (v.p = 2:Int32)), output: [v.cnt, v.p], lookup table: v } └─BatchExchange { order: [], dist: UpstreamHashShard(idx1.v.p) } └─BatchHashAgg { group_key: [idx1.v.p], aggs: [] } └─BatchExchange { order: [], dist: HashShard(idx1.v.p) } @@ -240,7 +240,7 @@ └─BatchExchange { order: [], dist: Single } └─BatchScan { table: v, columns: [v.p], scan_ranges: [v.p = Int32(2)], distribution: UpstreamHashShard(v.p) } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx1.v.p IS NOT DISTINCT FROM v.p AND ((v.cnt = 1:Int32) OR (v.p = 2:Int32)), output: [v.cnt, v.p] } + BatchLookupJoin { type: Inner, predicate: idx1.v.p IS NOT DISTINCT FROM v.p AND ((v.cnt = 1:Int32) OR (v.p = 2:Int32)), output: [v.cnt, v.p], lookup table: v } └─BatchHashAgg { group_key: [idx1.v.p], aggs: [] } └─BatchUnion { all: true } ├─BatchExchange { order: [], dist: Single } @@ -255,7 +255,7 @@ select * from t1 where a = 1 or c = 1 batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.a = 1:Int32) OR (t1.c = 1:Int32)), output: [t1.a, t1.b, t1.c] } + └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.a = 1:Int32) OR (t1.c = 1:Int32)), output: [t1.a, t1.b, t1.c], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t1._row_id) } └─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] } └─BatchExchange { order: [], dist: HashShard(idx1.t1._row_id) } @@ -265,7 +265,7 @@ └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx3, columns: [idx3.t1._row_id], scan_ranges: [idx3.c = Int64(1)], distribution: SomeShard } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.a = 1:Int32) OR (t1.c = 1:Int32)), output: [t1.a, t1.b, t1.c] } + BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.a = 1:Int32) OR (t1.c = 1:Int32)), output: [t1.a, t1.b, t1.c], lookup table: t1 } └─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] } └─BatchUnion { all: true } ├─BatchExchange { order: [], dist: Single } @@ -280,7 +280,7 @@ select * from t1 where c = 1 or (a = 2 and b = 3) batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.c = 1:Int32) OR ((t1.a = 2:Int32) AND (t1.b = 3:Decimal))), output: [t1.a, t1.b, t1.c] } + └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.c = 1:Int32) OR ((t1.a = 2:Int32) AND (t1.b = 3:Decimal))), output: [t1.a, t1.b, t1.c], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx2.t1._row_id) } └─BatchHashAgg { group_key: [idx2.t1._row_id], aggs: [] } └─BatchExchange { order: [], dist: HashShard(idx2.t1._row_id) } @@ -290,7 +290,7 @@ └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx3, columns: [idx3.t1._row_id], scan_ranges: [idx3.c = Int64(1)], distribution: SomeShard } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.c = 1:Int32) OR ((t1.a = 2:Int32) AND (t1.b = 3:Decimal))), output: [t1.a, t1.b, t1.c] } + BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.c = 1:Int32) OR ((t1.a = 2:Int32) AND (t1.b = 3:Decimal))), output: [t1.a, t1.b, t1.c], lookup table: t1 } └─BatchHashAgg { group_key: [idx2.t1._row_id], aggs: [] } └─BatchUnion { all: true } ├─BatchExchange { order: [], dist: Single } @@ -306,7 +306,7 @@ select * from t1 where p = 1 or (a = 2 and b = 3 and c = 4) batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.p = 1:Int32) OR (((t1.a = 2:Int32) AND (t1.b = 3:Decimal)) AND (t1.c = 4:Int32))), output: [t1.a, t1.b, t1.c, t1.p] } + └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.p = 1:Int32) OR (((t1.a = 2:Int32) AND (t1.b = 3:Decimal)) AND (t1.c = 4:Int32))), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx2.t1._row_id) } └─BatchHashAgg { group_key: [idx2.t1._row_id], aggs: [] } └─BatchExchange { order: [], dist: HashShard(idx2.t1._row_id) } @@ -316,7 +316,7 @@ └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx4, columns: [idx4.t1._row_id], scan_ranges: [idx4.p = Int32(1)], distribution: SomeShard } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.p = 1:Int32) OR (((t1.a = 2:Int32) AND (t1.b = 3:Decimal)) AND (t1.c = 4:Int32))), output: [t1.a, t1.b, t1.c, t1.p] } + BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.p = 1:Int32) OR (((t1.a = 2:Int32) AND (t1.b = 3:Decimal)) AND (t1.c = 4:Int32))), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchHashAgg { group_key: [idx2.t1._row_id], aggs: [] } └─BatchUnion { all: true } ├─BatchExchange { order: [], dist: Single } @@ -332,7 +332,7 @@ select * from t1 where a = 1 or b = 2 or c = 3 or p = 4 or a = 5 batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR (t1.c = 3:Int32)) OR (t1.p = 4:Int32)) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p] } + └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR (t1.c = 3:Int32)) OR (t1.p = 4:Int32)) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t1._row_id) } └─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] } └─BatchExchange { order: [], dist: HashShard(idx1.t1._row_id) } @@ -346,7 +346,7 @@ └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx4, columns: [idx4.t1._row_id], scan_ranges: [idx4.p = Int32(4)], distribution: SomeShard } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR (t1.c = 3:Int32)) OR (t1.p = 4:Int32)) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p] } + BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR (t1.c = 3:Int32)) OR (t1.p = 4:Int32)) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] } └─BatchUnion { all: true } ├─BatchExchange { order: [], dist: Single } @@ -366,7 +366,7 @@ select * from t1 where (a = 1 or (b = 2 and a = 5)) and (c = 3 or p = 4) batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.a = 1:Int32) OR ((t1.b = 2:Decimal) AND (t1.a = 5:Int32))) AND ((t1.c = 3:Int32) OR (t1.p = 4:Int32)), output: [t1.a, t1.b, t1.c, t1.p] } + └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.a = 1:Int32) OR ((t1.b = 2:Decimal) AND (t1.a = 5:Int32))) AND ((t1.c = 3:Int32) OR (t1.p = 4:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t1._row_id) } └─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] } └─BatchExchange { order: [], dist: HashShard(idx1.t1._row_id) } @@ -376,7 +376,7 @@ └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2)) AND idx2.a = Int32(5)], distribution: SomeShard } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.a = 1:Int32) OR ((t1.b = 2:Decimal) AND (t1.a = 5:Int32))) AND ((t1.c = 3:Int32) OR (t1.p = 4:Int32)), output: [t1.a, t1.b, t1.c, t1.p] } + BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((t1.a = 1:Int32) OR ((t1.b = 2:Decimal) AND (t1.a = 5:Int32))) AND ((t1.c = 3:Int32) OR (t1.p = 4:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] } └─BatchUnion { all: true } ├─BatchExchange { order: [], dist: Single } @@ -392,7 +392,7 @@ select * from t1 where p != 1 and (c = 3 or (c != 4 and (a = 2 or b = 3))) batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (t1.p <> 1:Int32) AND ((t1.c = 3:Int32) OR ((t1.c <> 4:Int32) AND ((t1.a = 2:Int32) OR (t1.b = 3:Decimal)))), output: [t1.a, t1.b, t1.c, t1.p] } + └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (t1.p <> 1:Int32) AND ((t1.c = 3:Int32) OR ((t1.c <> 4:Int32) AND ((t1.a = 2:Int32) OR (t1.b = 3:Decimal)))), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t1._row_id) } └─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] } └─BatchExchange { order: [], dist: HashShard(idx1.t1._row_id) } @@ -404,7 +404,7 @@ └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx3, columns: [idx3.t1._row_id], scan_ranges: [idx3.c = Int64(3)], distribution: SomeShard } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (t1.p <> 1:Int32) AND ((t1.c = 3:Int32) OR ((t1.c <> 4:Int32) AND ((t1.a = 2:Int32) OR (t1.b = 3:Decimal)))), output: [t1.a, t1.b, t1.c, t1.p] } + BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (t1.p <> 1:Int32) AND ((t1.c = 3:Int32) OR ((t1.c <> 4:Int32) AND ((t1.a = 2:Int32) OR (t1.b = 3:Decimal)))), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] } └─BatchUnion { all: true } ├─BatchExchange { order: [], dist: Single } @@ -423,7 +423,7 @@ select * from t1 where (a > 1 and a < 8) or c between 8 and 9 batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((t1.a > 1:Int32) AND (t1.a < 8:Int32)) OR ((t1.c >= 8:Int32) AND (t1.c <= 9:Int32))), output: [t1.a, t1.b, t1.c, t1.p] } + └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((t1.a > 1:Int32) AND (t1.a < 8:Int32)) OR ((t1.c >= 8:Int32) AND (t1.c <= 9:Int32))), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t1._row_id) } └─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] } └─BatchExchange { order: [], dist: HashShard(idx1.t1._row_id) } @@ -433,7 +433,7 @@ └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx3, columns: [idx3.t1._row_id], scan_ranges: [idx3.c = Int64(8), idx3.c = Int64(9)], distribution: SomeShard } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((t1.a > 1:Int32) AND (t1.a < 8:Int32)) OR ((t1.c >= 8:Int32) AND (t1.c <= 9:Int32))), output: [t1.a, t1.b, t1.c, t1.p] } + BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((t1.a > 1:Int32) AND (t1.a < 8:Int32)) OR ((t1.c >= 8:Int32) AND (t1.c <= 9:Int32))), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] } └─BatchUnion { all: true } ├─BatchExchange { order: [], dist: Single } @@ -449,7 +449,7 @@ select * from t1 where (a > 1 and a < 8) or c = 8 batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((t1.a > 1:Int32) AND (t1.a < 8:Int32)) OR (t1.c = 8:Int32)), output: [t1.a, t1.b, t1.c, t1.p] } + └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((t1.a > 1:Int32) AND (t1.a < 8:Int32)) OR (t1.c = 8:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t1._row_id) } └─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] } └─BatchExchange { order: [], dist: HashShard(idx1.t1._row_id) } @@ -459,7 +459,7 @@ └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx3, columns: [idx3.t1._row_id], scan_ranges: [idx3.c = Int64(8)], distribution: SomeShard } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((t1.a > 1:Int32) AND (t1.a < 8:Int32)) OR (t1.c = 8:Int32)), output: [t1.a, t1.b, t1.c, t1.p] } + BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((t1.a > 1:Int32) AND (t1.a < 8:Int32)) OR (t1.c = 8:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] } └─BatchUnion { all: true } ├─BatchExchange { order: [], dist: Single } @@ -547,11 +547,11 @@ select * from t1 where a in ( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99,100); batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1.p] } + └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t1._row_id) } └─BatchScan { table: idx1, columns: [idx1.t1._row_id], scan_ranges: [idx1.a = Int32(1), idx1.a = Int32(2), idx1.a = Int32(3), idx1.a = Int32(4), idx1.a = Int32(5), idx1.a = Int32(6), idx1.a = Int32(7), idx1.a = Int32(8), idx1.a = Int32(9), idx1.a = Int32(10), idx1.a = Int32(11), idx1.a = Int32(12), idx1.a = Int32(13), idx1.a = Int32(14), idx1.a = Int32(15), idx1.a = Int32(16), idx1.a = Int32(17), idx1.a = Int32(18), idx1.a = Int32(19), idx1.a = Int32(20), ...], distribution: SomeShard } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1.p] } + BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx1, columns: [idx1.t1._row_id], scan_ranges: [idx1.a = Int32(1), idx1.a = Int32(2), idx1.a = Int32(3), idx1.a = Int32(4), idx1.a = Int32(5), idx1.a = Int32(6), idx1.a = Int32(7), idx1.a = Int32(8), idx1.a = Int32(9), idx1.a = Int32(10), idx1.a = Int32(11), idx1.a = Int32(12), idx1.a = Int32(13), idx1.a = Int32(14), idx1.a = Int32(15), idx1.a = Int32(16), idx1.a = Int32(17), idx1.a = Int32(18), idx1.a = Int32(19), idx1.a = Int32(20), ...], distribution: SomeShard } - sql: | @@ -562,12 +562,12 @@ BatchSimpleAgg { aggs: [sum0(count)] } └─BatchExchange { order: [], dist: Single } └─BatchSimpleAgg { aggs: [count] } - └─BatchLookupJoin { type: Inner, predicate: d.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (t1.d > 9:Int32), output: [] } + └─BatchLookupJoin { type: Inner, predicate: d.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (t1.d > 9:Int32), output: [], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(d.t1._row_id) } └─BatchScan { table: d, columns: [d.t1._row_id], scan_ranges: [d.a = Int32(1)], distribution: SomeShard } batch_local_plan: |- BatchSimpleAgg { aggs: [count] } - └─BatchLookupJoin { type: Inner, predicate: d.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (t1.d > 9:Int32), output: [] } + └─BatchLookupJoin { type: Inner, predicate: d.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (t1.d > 9:Int32), output: [], lookup table: t1 } └─BatchExchange { order: [], dist: Single } └─BatchScan { table: d, columns: [d.t1._row_id], scan_ranges: [d.a = Int32(1)], distribution: SomeShard } - name: create index to include all columns by default @@ -678,7 +678,7 @@ batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [(t1.v1 + 1:Int32) as $expr1, (t2.v2 + 2:Int32) as $expr2] } - └─BatchLookupJoin { type: Inner, predicate: t1.k1 = t2.k2, output: all } + └─BatchLookupJoin { type: Inner, predicate: t1.k1 = t2.k2, output: all, lookup table: t2 } └─BatchExchange { order: [], dist: UpstreamHashShard(t1.k1) } └─BatchScan { table: t1, columns: [t1.k1, t1.v1], distribution: UpstreamHashShard(t1.k1) } - sql: | @@ -690,7 +690,7 @@ BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [t2.d1, t2.d2, idx_t1.c1, idx_t1.c2, idx_t1.c3, $expr1, ($expr1 + '00:10:00':Interval) as $expr2] } └─BatchProject { exprs: [t2.d1, t2.d2, idx_t1.c1, idx_t1.c2, idx_t1.c3, TumbleStart(idx_t1.c3, '00:10:00':Interval) as $expr1] } - └─BatchLookupJoin { type: Inner, predicate: t2.d1 = idx_t1.c2, output: all } + └─BatchLookupJoin { type: Inner, predicate: t2.d1 = idx_t1.c2, output: all, lookup table: idx_t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(t2.d1) } └─BatchScan { table: t2, columns: [t2.d1, t2.d2], distribution: SomeShard } - sql: | @@ -706,7 +706,7 @@ select * from t where j->>'k1' = 'abc'; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx1.t._row_id IS NOT DISTINCT FROM t._row_id, output: [t.j, t.v1, t.v2] } + └─BatchLookupJoin { type: Inner, predicate: idx1.t._row_id IS NOT DISTINCT FROM t._row_id, output: [t.j, t.v1, t.v2], lookup table: t } └─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t._row_id) } └─BatchScan { table: idx1, columns: [idx1.t._row_id], scan_ranges: [idx1.JSONB_ACCESS_STR = Utf8("abc")], distribution: SomeShard } - sql: | @@ -716,7 +716,7 @@ select * from t where j->>'k1' = 'abc' or j->>'k2' = 'ABC'; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx1.t._row_id IS NOT DISTINCT FROM t._row_id AND ((JsonbAccessStr(t.j, 'k1':Varchar) = 'abc':Varchar) OR (JsonbAccessStr(t.j, 'k2':Varchar) = 'ABC':Varchar)), output: [t.j, t.v1, t.v2] } + └─BatchLookupJoin { type: Inner, predicate: idx1.t._row_id IS NOT DISTINCT FROM t._row_id AND ((JsonbAccessStr(t.j, 'k1':Varchar) = 'abc':Varchar) OR (JsonbAccessStr(t.j, 'k2':Varchar) = 'ABC':Varchar)), output: [t.j, t.v1, t.v2], lookup table: t } └─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t._row_id) } └─BatchHashAgg { group_key: [idx1.t._row_id], aggs: [] } └─BatchExchange { order: [], dist: HashShard(idx1.t._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/output/intersect.yaml b/src/frontend/planner_test/tests/testdata/output/intersect.yaml index 5badfae8c9f7..81bb53b26ffb 100644 --- a/src/frontend/planner_test/tests/testdata/output/intersect.yaml +++ b/src/frontend/planner_test/tests/testdata/output/intersect.yaml @@ -115,7 +115,7 @@ batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchHashAgg { group_key: [t1.a], aggs: [internal_last_seen_value(t1.b), internal_last_seen_value(t1.c)] } - └─BatchLookupJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } + └─BatchLookupJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all, lookup table: t2 } └─BatchExchange { order: [], dist: UpstreamHashShard(t1.a) } └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: UpstreamHashShard(t1.a) } stream_plan: |- diff --git a/src/frontend/planner_test/tests/testdata/output/join.yaml b/src/frontend/planner_test/tests/testdata/output/join.yaml index 387862e92861..2ae639511f20 100644 --- a/src/frontend/planner_test/tests/testdata/output/join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/join.yaml @@ -154,7 +154,7 @@ select i.x as ix, ii.x as iix from i join i as ii on i.x=ii.x; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: i.x = i.x, output: all } + └─BatchLookupJoin { type: Inner, predicate: i.x = i.x, output: all, lookup table: i } └─BatchScan { table: i, columns: [i.x], distribution: UpstreamHashShard(i.x) } stream_plan: |- StreamMaterialize { columns: [ix, iix, i.t._row_id(hidden), i.t._row_id#1(hidden)], stream_key: [i.t._row_id, i.t._row_id#1, ix], pk_columns: [i.t._row_id, i.t._row_id#1, ix], pk_conflict: NoCheck } @@ -171,7 +171,7 @@ select i.x as ix, t.x as tx from i join t on i.x=t.x; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: i.x = i.x, output: all } + └─BatchLookupJoin { type: Inner, predicate: i.x = i.x, output: all, lookup table: i } └─BatchScan { table: i, columns: [i.x], distribution: UpstreamHashShard(i.x) } stream_plan: |- StreamMaterialize { columns: [ix, tx, i.t._row_id(hidden), t._row_id(hidden)], stream_key: [i.t._row_id, t._row_id, ix], pk_columns: [i.t._row_id, t._row_id, ix], pk_conflict: NoCheck } @@ -195,10 +195,10 @@ └─BatchProject { exprs: [Coalesce(i.x, i.x) as $expr1] } └─BatchHashJoin { type: FullOuter, predicate: i.x = i.x, output: all } ├─BatchExchange { order: [], dist: HashShard(i.x) } - │ └─BatchLookupJoin { type: Inner, predicate: i.x = i.x, output: [i.x] } + │ └─BatchLookupJoin { type: Inner, predicate: i.x = i.x, output: [i.x], lookup table: i } │ └─BatchScan { table: i, columns: [i.x], distribution: UpstreamHashShard(i.x) } └─BatchExchange { order: [], dist: HashShard(i.x) } - └─BatchLookupJoin { type: Inner, predicate: i.x = i.x, output: [i.x] } + └─BatchLookupJoin { type: Inner, predicate: i.x = i.x, output: [i.x], lookup table: i } └─BatchScan { table: i, columns: [i.x], distribution: UpstreamHashShard(i.x) } stream_plan: |- StreamMaterialize { columns: [x, i.t._row_id(hidden), i.t._row_id#1(hidden), i.x(hidden), i.t._row_id#2(hidden), i.t._row_id#3(hidden), i.x#1(hidden)], stream_key: [i.t._row_id, i.t._row_id#1, i.x, i.t._row_id#2, i.t._row_id#3, i.x#1], pk_columns: [i.t._row_id, i.t._row_id#1, i.x, i.t._row_id#2, i.t._row_id#3, i.x#1], pk_conflict: NoCheck } @@ -225,7 +225,7 @@ create materialized view t3 as select v1, count(v2) as v2 from t2 group by v1; select * from t1 cross join t3 where t1.v2 = t3.v1; batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: t1.v2 = t3.v1, output: all } + BatchLookupJoin { type: Inner, predicate: t1.v2 = t3.v1, output: all, lookup table: t3 } └─BatchExchange { order: [], dist: Single } └─BatchScan { table: t1, columns: [t1.v1, t1.v2], distribution: SomeShard } with_config_map: @@ -578,7 +578,7 @@ ├─LogicalScan { table: t1, columns: [t1.v1, t1.v2], predicate: (t1.v2 > 1:Int32) } └─LogicalScan { table: t3, columns: [t3.v1, t3.v2], predicate: (t3.v1 > 1:Int32) } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: t1.v2 = t3.v1 AND (t3.v1 > 1:Int32), output: all } + BatchLookupJoin { type: Inner, predicate: t1.v2 = t3.v1 AND (t3.v1 > 1:Int32), output: all, lookup table: t3 } └─BatchExchange { order: [], dist: Single } └─BatchFilter { predicate: (t1.v2 > 1:Int32) } └─BatchScan { table: t1, columns: [t1.v1, t1.v2], distribution: SomeShard } diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index cd3fb179a43f..dad8d469dcba 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -174,7 +174,7 @@ A.category = 10 and (P.state = 'or' OR P.state = 'id' OR P.state = 'ca'); batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: auction.seller = person.id AND (((person.state = 'or':Varchar) OR (person.state = 'id':Varchar)) OR (person.state = 'ca':Varchar)), output: [person.name, person.city, person.state, auction.id] } + └─BatchLookupJoin { type: Inner, predicate: auction.seller = person.id AND (((person.state = 'or':Varchar) OR (person.state = 'id':Varchar)) OR (person.state = 'ca':Varchar)), output: [person.name, person.city, person.state, auction.id], lookup table: person } └─BatchExchange { order: [], dist: UpstreamHashShard(auction.seller) } └─BatchProject { exprs: [auction.id, auction.seller] } └─BatchFilter { predicate: (auction.category = 10:Int32) } @@ -1798,7 +1798,7 @@ WHERE A.category = 10; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: bid.auction = auction.id AND (auction.category = 10:Int32), output: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, auction.item_name, auction.description, auction.initial_bid, auction.reserve, auction.date_time, auction.expires, auction.seller, auction.category] } + └─BatchLookupJoin { type: Inner, predicate: bid.auction = auction.id AND (auction.category = 10:Int32), output: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, auction.item_name, auction.description, auction.initial_bid, auction.reserve, auction.date_time, auction.expires, auction.seller, auction.category], lookup table: auction } └─BatchExchange { order: [], dist: UpstreamHashShard(bid.auction) } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time], distribution: SomeShard } stream_plan: |- diff --git a/src/frontend/planner_test/tests/testdata/output/subquery.yaml b/src/frontend/planner_test/tests/testdata/output/subquery.yaml index 309892e692b0..812471bc5ec2 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery.yaml @@ -847,7 +847,7 @@ BatchHopWindow { time_col: t1.ts, slide: 00:10:00, size: 00:30:00, output: all } └─BatchExchange { order: [], dist: Single } └─BatchFilter { predicate: IsNotNull(t1.ts) } - └─BatchLookupJoin { type: Inner, predicate: 1:Int32 = t1.k AND IsNotNull(t1.ts), output: all } + └─BatchLookupJoin { type: Inner, predicate: 1:Int32 = t1.k AND IsNotNull(t1.ts), output: all, lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(1:Int32) } └─BatchHashAgg { group_key: [1:Int32], aggs: [] } └─BatchExchange { order: [], dist: HashShard(1:Int32) } diff --git a/src/frontend/planner_test/tests/testdata/output/time_window.yaml b/src/frontend/planner_test/tests/testdata/output/time_window.yaml index ea7d9c5fbe4d..de32fd358aa6 100644 --- a/src/frontend/planner_test/tests/testdata/output/time_window.yaml +++ b/src/frontend/planner_test/tests/testdata/output/time_window.yaml @@ -237,7 +237,7 @@ BatchHopWindow { time_col: idx_t1.c3, slide: 00:10:00, size: 00:20:00, output: all } └─BatchExchange { order: [], dist: Single } └─BatchFilter { predicate: IsNotNull(idx_t1.c3) } - └─BatchLookupJoin { type: Inner, predicate: t2.d1 = idx_t1.c2 AND IsNotNull(idx_t1.c3), output: all } + └─BatchLookupJoin { type: Inner, predicate: t2.d1 = idx_t1.c2 AND IsNotNull(idx_t1.c3), output: all, lookup table: idx_t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(t2.d1) } └─BatchScan { table: t2, columns: [t2.d1, t2.d2, t2.d3], distribution: SomeShard } - sql: | @@ -268,7 +268,7 @@ └─BatchFilter { predicate: IsNotNull(idx_t1.c3) } └─BatchHopWindow { time_col: t2.d3, slide: 00:10:00, size: 00:20:00, output: all } └─BatchFilter { predicate: IsNotNull(t2.d3) } - └─BatchLookupJoin { type: Inner, predicate: t2.d1 = idx_t1.c2 AND IsNotNull(idx_t1.c3), output: all } + └─BatchLookupJoin { type: Inner, predicate: t2.d1 = idx_t1.c2 AND IsNotNull(idx_t1.c3), output: all, lookup table: idx_t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(t2.d1) } └─BatchFilter { predicate: IsNotNull(t2.d3) } └─BatchScan { table: t2, columns: [t2.d1, t2.d2, t2.d3], distribution: SomeShard } diff --git a/src/frontend/planner_test/tests/testdata/output/tpch.yaml b/src/frontend/planner_test/tests/testdata/output/tpch.yaml index 1037300ef966..69ebe2213cb7 100644 --- a/src/frontend/planner_test/tests/testdata/output/tpch.yaml +++ b/src/frontend/planner_test/tests/testdata/output/tpch.yaml @@ -289,13 +289,13 @@ BatchTopN { order: [supplier.s_acctbal DESC, nation.n_name ASC, supplier.s_name ASC, part.p_partkey ASC], limit: 100, offset: 0 } └─BatchExchange { order: [], dist: Single } └─BatchTopN { order: [supplier.s_acctbal DESC, nation.n_name ASC, supplier.s_name ASC, part.p_partkey ASC], limit: 100, offset: 0 } - └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name = 'AFRICA':Varchar), output: [supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment] } + └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name = 'AFRICA':Varchar), output: [supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment], lookup table: region } └─BatchExchange { order: [], dist: UpstreamHashShard(nation.n_regionkey) } └─BatchHashJoin { type: Inner, predicate: part.p_partkey IS NOT DISTINCT FROM part.p_partkey AND partsupp.ps_supplycost = min(partsupp.ps_supplycost), output: [part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name, nation.n_regionkey] } ├─BatchExchange { order: [], dist: HashShard(part.p_partkey) } - │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [part.p_partkey, part.p_mfgr, partsupp.ps_supplycost, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name, nation.n_regionkey] } + │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [part.p_partkey, part.p_mfgr, partsupp.ps_supplycost, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, nation.n_name, nation.n_regionkey], lookup table: nation } │ └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } - │ └─BatchLookupJoin { type: Inner, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [part.p_partkey, part.p_mfgr, partsupp.ps_supplycost, supplier.s_name, supplier.s_address, supplier.s_nationkey, supplier.s_phone, supplier.s_acctbal, supplier.s_comment] } + │ └─BatchLookupJoin { type: Inner, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [part.p_partkey, part.p_mfgr, partsupp.ps_supplycost, supplier.s_name, supplier.s_address, supplier.s_nationkey, supplier.s_phone, supplier.s_acctbal, supplier.s_comment], lookup table: supplier } │ └─BatchExchange { order: [], dist: UpstreamHashShard(partsupp.ps_suppkey) } │ └─BatchHashJoin { type: Inner, predicate: part.p_partkey = partsupp.ps_partkey, output: [part.p_partkey, part.p_mfgr, partsupp.ps_suppkey, partsupp.ps_supplycost] } │ ├─BatchExchange { order: [], dist: HashShard(part.p_partkey) } @@ -312,11 +312,11 @@ │ └─BatchFilter { predicate: (part.p_size = 4:Int32) AND Like(part.p_type, '%TIN':Varchar) } │ └─BatchScan { table: part, columns: [part.p_partkey, part.p_type, part.p_size], distribution: UpstreamHashShard(part.p_partkey) } └─BatchExchange { order: [], dist: HashShard(partsupp.ps_partkey) } - └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name = 'AFRICA':Varchar), output: [partsupp.ps_partkey, partsupp.ps_supplycost] } + └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name = 'AFRICA':Varchar), output: [partsupp.ps_partkey, partsupp.ps_supplycost], lookup table: region } └─BatchExchange { order: [], dist: UpstreamHashShard(nation.n_regionkey) } - └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [partsupp.ps_partkey, partsupp.ps_supplycost, nation.n_regionkey] } + └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [partsupp.ps_partkey, partsupp.ps_supplycost, nation.n_regionkey], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } - └─BatchLookupJoin { type: Inner, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [partsupp.ps_partkey, partsupp.ps_supplycost, supplier.s_nationkey] } + └─BatchLookupJoin { type: Inner, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [partsupp.ps_partkey, partsupp.ps_supplycost, supplier.s_nationkey], lookup table: supplier } └─BatchExchange { order: [], dist: UpstreamHashShard(partsupp.ps_suppkey) } └─BatchFilter { predicate: IsNotNull(partsupp.ps_partkey) } └─BatchScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost], distribution: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } @@ -971,9 +971,9 @@ └─BatchHashAgg { group_key: [nation.n_name], aggs: [sum($expr1)] } └─BatchExchange { order: [], dist: HashShard(nation.n_name) } └─BatchProject { exprs: [nation.n_name, (lineitem.l_extendedprice * (1:Decimal - lineitem.l_discount)) as $expr1] } - └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name = 'MIDDLE EAST':Varchar), output: [lineitem.l_extendedprice, lineitem.l_discount, nation.n_name] } + └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name = 'MIDDLE EAST':Varchar), output: [lineitem.l_extendedprice, lineitem.l_discount, nation.n_name], lookup table: region } └─BatchExchange { order: [], dist: UpstreamHashShard(nation.n_regionkey) } - └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, nation.n_regionkey] } + └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, nation.n_regionkey], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } └─BatchHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey AND supplier.s_suppkey = lineitem.l_suppkey, output: [supplier.s_nationkey, lineitem.l_extendedprice, lineitem.l_discount] } ├─BatchExchange { order: [], dist: HashShard(orders.o_orderkey, supplier.s_suppkey) } @@ -1326,13 +1326,13 @@ └─BatchHashAgg { group_key: [nation.n_name, nation.n_name, $expr1], aggs: [sum($expr2)] } └─BatchExchange { order: [], dist: HashShard(nation.n_name, nation.n_name, $expr1) } └─BatchProject { exprs: [nation.n_name, nation.n_name, Extract('YEAR':Varchar, lineitem.l_shipdate) as $expr1, (lineitem.l_extendedprice * (1:Decimal - lineitem.l_discount)) as $expr2] } - └─BatchLookupJoin { type: Inner, predicate: customer.c_nationkey = nation.n_nationkey AND (((nation.n_name = 'ROMANIA':Varchar) AND (nation.n_name = 'IRAN':Varchar)) OR ((nation.n_name = 'IRAN':Varchar) AND (nation.n_name = 'ROMANIA':Varchar))), output: [lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, nation.n_name, nation.n_name] } + └─BatchLookupJoin { type: Inner, predicate: customer.c_nationkey = nation.n_nationkey AND (((nation.n_name = 'ROMANIA':Varchar) AND (nation.n_name = 'IRAN':Varchar)) OR ((nation.n_name = 'IRAN':Varchar) AND (nation.n_name = 'ROMANIA':Varchar))), output: [lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, nation.n_name, nation.n_name], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard(customer.c_nationkey) } - └─BatchLookupJoin { type: Inner, predicate: orders.o_custkey = customer.c_custkey, output: [lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, nation.n_name, customer.c_nationkey] } + └─BatchLookupJoin { type: Inner, predicate: orders.o_custkey = customer.c_custkey, output: [lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, nation.n_name, customer.c_nationkey], lookup table: customer } └─BatchExchange { order: [], dist: UpstreamHashShard(orders.o_custkey) } - └─BatchLookupJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, nation.n_name, orders.o_custkey] } + └─BatchLookupJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, nation.n_name, orders.o_custkey], lookup table: orders } └─BatchExchange { order: [], dist: UpstreamHashShard(lineitem.l_orderkey) } - └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, nation.n_name] } + └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, nation.n_name], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } └─BatchHashJoin { type: Inner, predicate: supplier.s_suppkey = lineitem.l_suppkey, output: [supplier.s_nationkey, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate] } ├─BatchExchange { order: [], dist: HashShard(supplier.s_suppkey) } @@ -1612,17 +1612,17 @@ └─BatchExchange { order: [], dist: HashShard($expr1) } └─BatchProject { exprs: [$expr1, Case((nation.n_name = 'IRAN':Varchar), $expr2, 0:Decimal) as $expr3, $expr2] } └─BatchProject { exprs: [Extract('YEAR':Varchar, orders.o_orderdate) as $expr1, (lineitem.l_extendedprice * (1:Decimal - lineitem.l_discount)) as $expr2, nation.n_name] } - └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name = 'ASIA':Varchar), output: [lineitem.l_extendedprice, lineitem.l_discount, orders.o_orderdate, nation.n_name] } + └─BatchLookupJoin { type: Inner, predicate: nation.n_regionkey = region.r_regionkey AND (region.r_name = 'ASIA':Varchar), output: [lineitem.l_extendedprice, lineitem.l_discount, orders.o_orderdate, nation.n_name], lookup table: region } └─BatchExchange { order: [], dist: UpstreamHashShard(nation.n_regionkey) } - └─BatchLookupJoin { type: Inner, predicate: customer.c_nationkey = nation.n_nationkey, output: [lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, orders.o_orderdate, nation.n_regionkey] } + └─BatchLookupJoin { type: Inner, predicate: customer.c_nationkey = nation.n_nationkey, output: [lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, orders.o_orderdate, nation.n_regionkey], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard(customer.c_nationkey) } - └─BatchLookupJoin { type: Inner, predicate: orders.o_custkey = customer.c_custkey, output: [lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, orders.o_orderdate, customer.c_nationkey] } + └─BatchLookupJoin { type: Inner, predicate: orders.o_custkey = customer.c_custkey, output: [lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, orders.o_orderdate, customer.c_nationkey], lookup table: customer } └─BatchExchange { order: [], dist: UpstreamHashShard(orders.o_custkey) } - └─BatchLookupJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey AND (orders.o_orderdate >= '1995-01-01':Date) AND (orders.o_orderdate <= '1996-12-31':Date), output: [lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, orders.o_custkey, orders.o_orderdate] } + └─BatchLookupJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey AND (orders.o_orderdate >= '1995-01-01':Date) AND (orders.o_orderdate <= '1996-12-31':Date), output: [lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, orders.o_custkey, orders.o_orderdate], lookup table: orders } └─BatchExchange { order: [], dist: UpstreamHashShard(lineitem.l_orderkey) } - └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name] } + └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } - └─BatchLookupJoin { type: Inner, predicate: lineitem.l_suppkey = supplier.s_suppkey, output: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey] } + └─BatchLookupJoin { type: Inner, predicate: lineitem.l_suppkey = supplier.s_suppkey, output: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey], lookup table: supplier } └─BatchExchange { order: [], dist: UpstreamHashShard(lineitem.l_suppkey) } └─BatchHashJoin { type: Inner, predicate: part.p_partkey = lineitem.l_partkey, output: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount] } ├─BatchExchange { order: [], dist: HashShard(part.p_partkey) } @@ -1945,13 +1945,13 @@ └─BatchHashAgg { group_key: [nation.n_name, $expr1], aggs: [sum($expr2)] } └─BatchExchange { order: [], dist: HashShard(nation.n_name, $expr1) } └─BatchProject { exprs: [nation.n_name, Extract('YEAR':Varchar, orders.o_orderdate) as $expr1, ((lineitem.l_extendedprice * (1:Decimal - lineitem.l_discount)) - (partsupp.ps_supplycost * lineitem.l_quantity)) as $expr2] } - └─BatchLookupJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, partsupp.ps_supplycost, orders.o_orderdate, nation.n_name] } + └─BatchLookupJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, partsupp.ps_supplycost, orders.o_orderdate, nation.n_name], lookup table: orders } └─BatchExchange { order: [], dist: UpstreamHashShard(lineitem.l_orderkey) } - └─BatchLookupJoin { type: Inner, predicate: lineitem.l_partkey = partsupp.ps_partkey AND lineitem.l_suppkey = partsupp.ps_suppkey, output: [lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, partsupp.ps_supplycost] } + └─BatchLookupJoin { type: Inner, predicate: lineitem.l_partkey = partsupp.ps_partkey AND lineitem.l_suppkey = partsupp.ps_suppkey, output: [lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, partsupp.ps_supplycost], lookup table: partsupp } └─BatchExchange { order: [], dist: UpstreamHashShard(lineitem.l_partkey, lineitem.l_suppkey) } - └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name] } + └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } - └─BatchLookupJoin { type: Inner, predicate: lineitem.l_suppkey = supplier.s_suppkey, output: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey] } + └─BatchLookupJoin { type: Inner, predicate: lineitem.l_suppkey = supplier.s_suppkey, output: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey], lookup table: supplier } └─BatchExchange { order: [], dist: UpstreamHashShard(lineitem.l_suppkey) } └─BatchHashJoin { type: Inner, predicate: part.p_partkey = lineitem.l_partkey, output: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount] } ├─BatchExchange { order: [], dist: HashShard(part.p_partkey) } @@ -2203,7 +2203,7 @@ └─BatchProject { exprs: [customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment, (lineitem.l_extendedprice * (1.00:Decimal - lineitem.l_discount)) as $expr1] } └─BatchHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, lineitem.l_extendedprice, lineitem.l_discount, nation.n_name] } ├─BatchExchange { order: [], dist: HashShard(orders.o_orderkey) } - │ └─BatchLookupJoin { type: Inner, predicate: customer.c_nationkey = nation.n_nationkey, output: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, orders.o_orderkey, nation.n_name] } + │ └─BatchLookupJoin { type: Inner, predicate: customer.c_nationkey = nation.n_nationkey, output: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_phone, customer.c_acctbal, customer.c_comment, orders.o_orderkey, nation.n_name], lookup table: nation } │ └─BatchExchange { order: [], dist: UpstreamHashShard(customer.c_nationkey) } │ └─BatchHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, customer.c_name, customer.c_address, customer.c_nationkey, customer.c_phone, customer.c_acctbal, customer.c_comment, orders.o_orderkey] } │ ├─BatchExchange { order: [], dist: HashShard(customer.c_custkey) } @@ -2428,9 +2428,9 @@ │ └─BatchHashAgg { group_key: [partsupp.ps_partkey], aggs: [sum($expr1)] } │ └─BatchExchange { order: [], dist: HashShard(partsupp.ps_partkey) } │ └─BatchProject { exprs: [partsupp.ps_partkey, (partsupp.ps_supplycost * partsupp.ps_availqty::Decimal) as $expr1] } - │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'ARGENTINA':Varchar), output: [partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost] } + │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'ARGENTINA':Varchar), output: [partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost], lookup table: nation } │ └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } - │ └─BatchLookupJoin { type: Inner, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey] } + │ └─BatchLookupJoin { type: Inner, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey], lookup table: supplier } │ └─BatchExchange { order: [], dist: UpstreamHashShard(partsupp.ps_suppkey) } │ └─BatchScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_availqty, partsupp.ps_supplycost], distribution: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } └─BatchProject { exprs: [(sum(sum($expr2)) * 0.0001000000:Decimal) as $expr3] } @@ -2438,9 +2438,9 @@ └─BatchExchange { order: [], dist: Single } └─BatchSimpleAgg { aggs: [sum($expr2)] } └─BatchProject { exprs: [(partsupp.ps_supplycost * partsupp.ps_availqty::Decimal) as $expr2] } - └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'ARGENTINA':Varchar), output: [partsupp.ps_availqty, partsupp.ps_supplycost] } + └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'ARGENTINA':Varchar), output: [partsupp.ps_availqty, partsupp.ps_supplycost], lookup table: nation } └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } - └─BatchLookupJoin { type: Inner, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey] } + └─BatchLookupJoin { type: Inner, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey], lookup table: supplier } └─BatchExchange { order: [], dist: UpstreamHashShard(partsupp.ps_suppkey) } └─BatchScan { table: partsupp, columns: [partsupp.ps_suppkey, partsupp.ps_availqty, partsupp.ps_supplycost], distribution: SomeShard } stream_plan: |- @@ -2947,7 +2947,7 @@ └─BatchExchange { order: [], dist: Single } └─BatchSimpleAgg { aggs: [sum($expr1), sum($expr2)] } └─BatchProject { exprs: [Case(Like(part.p_type, 'PROMO%':Varchar), (lineitem.l_extendedprice * (1:Decimal - lineitem.l_discount)), 0:Decimal) as $expr1, (lineitem.l_extendedprice * (1:Decimal - lineitem.l_discount)) as $expr2] } - └─BatchLookupJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey, output: [lineitem.l_extendedprice, lineitem.l_discount, part.p_type] } + └─BatchLookupJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey, output: [lineitem.l_extendedprice, lineitem.l_discount, part.p_type], lookup table: part } └─BatchExchange { order: [], dist: UpstreamHashShard(lineitem.l_partkey) } └─BatchProject { exprs: [lineitem.l_partkey, lineitem.l_extendedprice, lineitem.l_discount] } └─BatchFilter { predicate: (lineitem.l_shipdate >= '1995-09-01':Date) AND (lineitem.l_shipdate < '1995-10-01 00:00:00':Timestamp) } @@ -3276,9 +3276,9 @@ └─BatchHashAgg { group_key: [part.p_brand, part.p_type, part.p_size], aggs: [count(partsupp.ps_suppkey)] } └─BatchExchange { order: [], dist: HashShard(part.p_brand, part.p_type, part.p_size) } └─BatchHashAgg { group_key: [part.p_brand, part.p_type, part.p_size, partsupp.ps_suppkey], aggs: [] } - └─BatchLookupJoin { type: LeftAnti, predicate: partsupp.ps_suppkey = supplier.s_suppkey AND Like(supplier.s_comment, '%Customer%Complaints%':Varchar), output: [part.p_brand, part.p_type, part.p_size, partsupp.ps_suppkey] } + └─BatchLookupJoin { type: LeftAnti, predicate: partsupp.ps_suppkey = supplier.s_suppkey AND Like(supplier.s_comment, '%Customer%Complaints%':Varchar), output: [part.p_brand, part.p_type, part.p_size, partsupp.ps_suppkey], lookup table: supplier } └─BatchExchange { order: [], dist: UpstreamHashShard(partsupp.ps_suppkey) } - └─BatchLookupJoin { type: Inner, predicate: partsupp.ps_partkey = part.p_partkey AND (part.p_brand <> 'Brand#45':Varchar) AND (Not((part.p_type >= 'SMALL PLATED':Varchar)) OR Not((part.p_type < 'SMALL PLATEE':Varchar))) AND In(part.p_size, 19:Int32, 17:Int32, 16:Int32, 23:Int32, 10:Int32, 4:Int32, 38:Int32, 11:Int32), output: [partsupp.ps_suppkey, part.p_brand, part.p_type, part.p_size] } + └─BatchLookupJoin { type: Inner, predicate: partsupp.ps_partkey = part.p_partkey AND (part.p_brand <> 'Brand#45':Varchar) AND (Not((part.p_type >= 'SMALL PLATED':Varchar)) OR Not((part.p_type < 'SMALL PLATEE':Varchar))) AND In(part.p_size, 19:Int32, 17:Int32, 16:Int32, 23:Int32, 10:Int32, 4:Int32, 38:Int32, 11:Int32), output: [partsupp.ps_suppkey, part.p_brand, part.p_type, part.p_size], lookup table: part } └─BatchExchange { order: [], dist: UpstreamHashShard(partsupp.ps_partkey) } └─BatchScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey], distribution: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } stream_plan: |- @@ -3483,7 +3483,7 @@ └─BatchSimpleAgg { aggs: [sum(lineitem.l_extendedprice)] } └─BatchHashJoin { type: Inner, predicate: part.p_partkey IS NOT DISTINCT FROM part.p_partkey AND (lineitem.l_quantity < $expr1), output: [lineitem.l_extendedprice] } ├─BatchExchange { order: [], dist: HashShard(part.p_partkey) } - │ └─BatchLookupJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey AND (part.p_brand = 'Brand#13':Varchar) AND (part.p_container = 'JUMBO PKG':Varchar), output: [lineitem.l_quantity, lineitem.l_extendedprice, part.p_partkey] } + │ └─BatchLookupJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey AND (part.p_brand = 'Brand#13':Varchar) AND (part.p_container = 'JUMBO PKG':Varchar), output: [lineitem.l_quantity, lineitem.l_extendedprice, part.p_partkey], lookup table: part } │ └─BatchExchange { order: [], dist: UpstreamHashShard(lineitem.l_partkey) } │ └─BatchScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice], distribution: SomeShard } └─BatchProject { exprs: [part.p_partkey, (0.2:Decimal * (sum(lineitem.l_quantity) / count(lineitem.l_quantity)::Decimal)) as $expr1] } @@ -3927,7 +3927,7 @@ └─BatchExchange { order: [], dist: Single } └─BatchSimpleAgg { aggs: [sum($expr1)] } └─BatchProject { exprs: [(lineitem.l_extendedprice * (1:Decimal - lineitem.l_discount)) as $expr1] } - └─BatchLookupJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey AND (((((((part.p_brand = 'Brand#52':Varchar) AND In(part.p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND (lineitem.l_quantity >= 1:Decimal)) AND (lineitem.l_quantity <= 11:Decimal)) AND (part.p_size <= 5:Int32)) OR (((((part.p_brand = 'Brand#24':Varchar) AND In(part.p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND (lineitem.l_quantity >= 30:Decimal)) AND (lineitem.l_quantity <= 40:Decimal)) AND (part.p_size <= 10:Int32))) OR (((((part.p_brand = 'Brand#32':Varchar) AND In(part.p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND (lineitem.l_quantity >= 10:Decimal)) AND (lineitem.l_quantity <= 20:Decimal)) AND (part.p_size <= 15:Int32))) AND (part.p_size >= 1:Int32), output: [lineitem.l_extendedprice, lineitem.l_discount] } + └─BatchLookupJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey AND (((((((part.p_brand = 'Brand#52':Varchar) AND In(part.p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND (lineitem.l_quantity >= 1:Decimal)) AND (lineitem.l_quantity <= 11:Decimal)) AND (part.p_size <= 5:Int32)) OR (((((part.p_brand = 'Brand#24':Varchar) AND In(part.p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND (lineitem.l_quantity >= 30:Decimal)) AND (lineitem.l_quantity <= 40:Decimal)) AND (part.p_size <= 10:Int32))) OR (((((part.p_brand = 'Brand#32':Varchar) AND In(part.p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND (lineitem.l_quantity >= 10:Decimal)) AND (lineitem.l_quantity <= 20:Decimal)) AND (part.p_size <= 15:Int32))) AND (part.p_size >= 1:Int32), output: [lineitem.l_extendedprice, lineitem.l_discount], lookup table: part } └─BatchExchange { order: [], dist: UpstreamHashShard(lineitem.l_partkey) } └─BatchProject { exprs: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount] } └─BatchFilter { predicate: In(lineitem.l_shipmode, 'AIR':Varchar, 'AIR REG':Varchar) AND (lineitem.l_shipinstruct = 'DELIVER IN PERSON':Varchar) } @@ -4041,11 +4041,11 @@ └─BatchTopN { order: [supplier.s_name ASC], limit: 1, offset: 0 } └─BatchHashJoin { type: LeftSemi, predicate: supplier.s_suppkey = partsupp.ps_suppkey, output: [supplier.s_name, supplier.s_address] } ├─BatchExchange { order: [], dist: HashShard(supplier.s_suppkey) } - │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'KENYA':Varchar), output: [supplier.s_suppkey, supplier.s_name, supplier.s_address] } + │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'KENYA':Varchar), output: [supplier.s_suppkey, supplier.s_name, supplier.s_address], lookup table: nation } │ └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } │ └─BatchScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_nationkey], distribution: UpstreamHashShard(supplier.s_suppkey) } └─BatchExchange { order: [], dist: HashShard(partsupp.ps_suppkey) } - └─BatchLookupJoin { type: LeftSemi, predicate: partsupp.ps_partkey = part.p_partkey AND (part.p_name >= 'forest':Varchar) AND (part.p_name < 'foresu':Varchar), output: [partsupp.ps_suppkey] } + └─BatchLookupJoin { type: LeftSemi, predicate: partsupp.ps_partkey = part.p_partkey AND (part.p_name >= 'forest':Varchar) AND (part.p_name < 'foresu':Varchar), output: [partsupp.ps_suppkey], lookup table: part } └─BatchExchange { order: [], dist: UpstreamHashShard(partsupp.ps_partkey) } └─BatchHashJoin { type: Inner, predicate: partsupp.ps_partkey = lineitem.l_partkey AND partsupp.ps_suppkey = lineitem.l_suppkey AND ($expr1 > $expr2), output: [partsupp.ps_partkey, partsupp.ps_suppkey] } ├─BatchExchange { order: [], dist: HashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } @@ -4314,9 +4314,9 @@ └─BatchHashJoin { type: LeftAnti, predicate: lineitem.l_orderkey = lineitem.l_orderkey AND (lineitem.l_suppkey <> lineitem.l_suppkey), output: [supplier.s_name] } ├─BatchHashJoin { type: LeftSemi, predicate: lineitem.l_orderkey = lineitem.l_orderkey AND (lineitem.l_suppkey <> lineitem.l_suppkey), output: all } │ ├─BatchExchange { order: [], dist: HashShard(lineitem.l_orderkey) } - │ │ └─BatchLookupJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey AND (orders.o_orderstatus = 'F':Varchar), output: [supplier.s_name, lineitem.l_orderkey, lineitem.l_suppkey] } + │ │ └─BatchLookupJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey AND (orders.o_orderstatus = 'F':Varchar), output: [supplier.s_name, lineitem.l_orderkey, lineitem.l_suppkey], lookup table: orders } │ │ └─BatchExchange { order: [], dist: UpstreamHashShard(lineitem.l_orderkey) } - │ │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'GERMANY':Varchar), output: [supplier.s_name, lineitem.l_orderkey, lineitem.l_suppkey] } + │ │ └─BatchLookupJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey AND (nation.n_name = 'GERMANY':Varchar), output: [supplier.s_name, lineitem.l_orderkey, lineitem.l_suppkey], lookup table: nation } │ │ └─BatchExchange { order: [], dist: UpstreamHashShard(supplier.s_nationkey) } │ │ └─BatchHashJoin { type: Inner, predicate: supplier.s_suppkey = lineitem.l_suppkey, output: [supplier.s_name, supplier.s_nationkey, lineitem.l_orderkey, lineitem.l_suppkey] } │ │ ├─BatchExchange { order: [], dist: HashShard(supplier.s_suppkey) } diff --git a/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs b/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs index b78bf314c127..2815ad70e08d 100644 --- a/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs +++ b/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs @@ -25,7 +25,7 @@ use super::ExprRewritable; use crate::expr::{Expr, ExprRewriter}; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{ - EqJoinPredicate, EqJoinPredicateDisplay, PlanBase, PlanTreeNodeUnary, ToBatchPb, + EqJoinPredicate, EqJoinPredicateDisplay, LogicalScan, PlanBase, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, ToLocalBatch, }; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -131,6 +131,11 @@ impl Distill for BatchLookupJoin { vec.push(("output", data)); } + if let Some(scan) = self.core.right.as_logical_scan() { + let scan: &LogicalScan = scan; + vec.push(("lookup table", Pretty::display(&scan.table_name()))); + } + childless_record("BatchLookupJoin", vec) } }