diff --git a/src/frontend/planner_test/tests/testdata/output/bushy_join.yaml b/src/frontend/planner_test/tests/testdata/output/bushy_join.yaml index 6bd212dce06b8..4c0208990fe50 100644 --- a/src/frontend/planner_test/tests/testdata/output/bushy_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/bushy_join.yaml @@ -18,31 +18,31 @@ │ ├─StreamExchange { dist: HashShard(t.id) } │ │ └─StreamHashJoin { type: Inner, predicate: t.id = t.id, output: [t.id, t.id, t._row_id, t._row_id] } │ │ ├─StreamExchange { dist: HashShard(t.id) } - │ │ │ └─StreamFilter { predicate: (t.id = t.id) } + │ │ │ └─StreamFilter { predicate: IsNotNull(t.id) } │ │ │ └─StreamTableScan { table: t, columns: [t.id, t._row_id], stream_scan_type: ArrangementBackfill, pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } │ │ └─StreamExchange { dist: HashShard(t.id) } - │ │ └─StreamFilter { predicate: (t.id = t.id) } + │ │ └─StreamFilter { predicate: IsNotNull(t.id) } │ │ └─StreamTableScan { table: t, columns: [t.id, t._row_id], stream_scan_type: ArrangementBackfill, pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } │ └─StreamHashJoin { type: Inner, predicate: t.id = t.id, output: [t.id, t.id, t._row_id, t._row_id] } │ ├─StreamExchange { dist: HashShard(t.id) } - │ │ └─StreamFilter { predicate: (t.id = t.id) } + │ │ └─StreamFilter { predicate: IsNotNull(t.id) } │ │ └─StreamTableScan { table: t, columns: [t.id, t._row_id], stream_scan_type: ArrangementBackfill, pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } │ └─StreamExchange { dist: HashShard(t.id) } - │ └─StreamFilter { predicate: (t.id = t.id) } + │ └─StreamFilter { predicate: IsNotNull(t.id) } │ └─StreamTableScan { table: t, columns: [t.id, t._row_id], stream_scan_type: ArrangementBackfill, pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } └─StreamHashJoin { type: Inner, predicate: t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id, output: [t.id, t.id, t.id, t.id, t._row_id, t._row_id, t._row_id, t._row_id] } ├─StreamExchange { dist: HashShard(t.id) } │ └─StreamHashJoin { type: Inner, predicate: t.id = t.id, output: [t.id, t.id, t._row_id, t._row_id] } │ ├─StreamExchange { dist: HashShard(t.id) } - │ │ └─StreamFilter { predicate: (t.id = t.id) } + │ │ └─StreamFilter { predicate: IsNotNull(t.id) } │ │ └─StreamTableScan { table: t, columns: [t.id, t._row_id], stream_scan_type: ArrangementBackfill, pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } │ └─StreamExchange { dist: HashShard(t.id) } - │ └─StreamFilter { predicate: (t.id = t.id) } + │ └─StreamFilter { predicate: IsNotNull(t.id) } │ └─StreamTableScan { table: t, columns: [t.id, t._row_id], stream_scan_type: ArrangementBackfill, pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } └─StreamHashJoin { type: Inner, predicate: t.id = t.id, output: [t.id, t.id, t._row_id, t._row_id] } ├─StreamExchange { dist: HashShard(t.id) } - │ └─StreamFilter { predicate: (t.id = t.id) } + │ └─StreamFilter { predicate: IsNotNull(t.id) } │ └─StreamTableScan { table: t, columns: [t.id, t._row_id], stream_scan_type: ArrangementBackfill, pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } └─StreamExchange { dist: HashShard(t.id) } - └─StreamFilter { predicate: (t.id = t.id) } + └─StreamFilter { predicate: IsNotNull(t.id) } └─StreamTableScan { table: t, columns: [t.id, t._row_id], stream_scan_type: ArrangementBackfill, pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml b/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml index 6cc4b1f08d0fd..414a82379c53b 100644 --- a/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml @@ -157,7 +157,7 @@ │ │ │ └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_regionkey], stream_scan_type: ArrangementBackfill, pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } │ │ └─StreamExchange { dist: HashShard($expr1) } │ │ └─StreamProject { exprs: [stock.s_i_id, stock.s_quantity, ((stock.s_w_id * stock.s_i_id) % 10000:Int32)::Int64 as $expr1, stock.s_w_id] } - │ │ └─StreamFilter { predicate: (stock.s_i_id = stock.s_i_id) } + │ │ └─StreamFilter { predicate: IsNotNull(stock.s_i_id) } │ │ └─StreamTableScan { table: stock, columns: [stock.s_i_id, stock.s_w_id, stock.s_quantity], stream_scan_type: ArrangementBackfill, pk: [stock.s_w_id, stock.s_i_id], dist: UpstreamHashShard(stock.s_w_id, stock.s_i_id) } │ └─StreamHashJoin { type: Inner, predicate: item.i_id = stock.s_i_id, output: all } │ ├─StreamExchange { dist: HashShard(item.i_id) } @@ -233,7 +233,7 @@ Fragment 9 StreamProject { exprs: [stock.s_i_id, stock.s_quantity, ((stock.s_w_id * stock.s_i_id) % 10000:Int32)::Int64 as $expr1, stock.s_w_id] } - └── StreamFilter { predicate: (stock.s_i_id = stock.s_i_id) } + └── StreamFilter { predicate: IsNotNull(stock.s_i_id) } └── StreamTableScan { table: stock, columns: [stock.s_i_id, stock.s_w_id, stock.s_quantity], stream_scan_type: ArrangementBackfill, pk: [stock.s_w_id, stock.s_i_id], dist: UpstreamHashShard(stock.s_w_id, stock.s_i_id) } { tables: [ StreamScan: 25 ] } ├── Upstream └── BatchPlanNode @@ -421,17 +421,17 @@ │ └─StreamHashJoin { type: Inner, predicate: customer.c_d_id = new_order.no_d_id AND customer.c_w_id = new_order.no_w_id, output: all } │ ├─StreamExchange { dist: HashShard(customer.c_d_id, customer.c_w_id) } │ │ └─StreamProject { exprs: [customer.c_id, customer.c_d_id, customer.c_w_id] } - │ │ └─StreamFilter { predicate: (customer.c_w_id = customer.c_w_id) AND (customer.c_d_id = customer.c_d_id) AND Like(customer.c_state, 'a%':Varchar) } + │ │ └─StreamFilter { predicate: IsNotNull(customer.c_w_id) AND IsNotNull(customer.c_d_id) AND Like(customer.c_state, 'a%':Varchar) } │ │ └─StreamTableScan { table: customer, columns: [customer.c_id, customer.c_d_id, customer.c_w_id, customer.c_state], stream_scan_type: ArrangementBackfill, pk: [customer.c_w_id, customer.c_d_id, customer.c_id], dist: UpstreamHashShard(customer.c_w_id, customer.c_d_id, customer.c_id) } │ └─StreamExchange { dist: HashShard(new_order.no_d_id, new_order.no_w_id) } - │ └─StreamFilter { predicate: (new_order.no_w_id = new_order.no_w_id) AND (new_order.no_d_id = new_order.no_d_id) AND (new_order.no_o_id = new_order.no_o_id) } + │ └─StreamFilter { predicate: IsNotNull(new_order.no_w_id) AND IsNotNull(new_order.no_d_id) AND IsNotNull(new_order.no_o_id) } │ └─StreamTableScan { table: new_order, columns: [new_order.no_o_id, new_order.no_d_id, new_order.no_w_id], stream_scan_type: ArrangementBackfill, pk: [new_order.no_w_id, new_order.no_d_id, new_order.no_o_id], dist: UpstreamHashShard(new_order.no_w_id, new_order.no_d_id, new_order.no_o_id) } └─StreamHashJoin { type: Inner, predicate: orders.o_w_id = order_line.ol_w_id AND orders.o_d_id = order_line.ol_d_id AND orders.o_id = order_line.ol_o_id, output: all } ├─StreamExchange { dist: HashShard(orders.o_id, orders.o_d_id, orders.o_w_id) } - │ └─StreamFilter { predicate: (orders.o_d_id = orders.o_d_id) AND (orders.o_w_id = orders.o_w_id) AND (orders.o_entry_d > '2007-01-02 00:00:00':Timestamp) } + │ └─StreamFilter { predicate: IsNotNull(orders.o_d_id) AND IsNotNull(orders.o_w_id) AND (orders.o_entry_d > '2007-01-02 00:00:00':Timestamp) } │ └─StreamTableScan { table: orders, columns: [orders.o_id, orders.o_d_id, orders.o_w_id, orders.o_c_id, orders.o_entry_d], stream_scan_type: ArrangementBackfill, pk: [orders.o_w_id, orders.o_d_id, orders.o_id], dist: UpstreamHashShard(orders.o_w_id, orders.o_d_id, orders.o_id) } └─StreamExchange { dist: HashShard(order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id) } - └─StreamFilter { predicate: (order_line.ol_d_id = order_line.ol_d_id) AND (order_line.ol_w_id = order_line.ol_w_id) } + └─StreamFilter { predicate: IsNotNull(order_line.ol_d_id) AND IsNotNull(order_line.ol_w_id) } └─StreamTableScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_amount, order_line.ol_number], stream_scan_type: ArrangementBackfill, pk: [order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number], dist: UpstreamHashShard(order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number) } stream_dist_plan: |+ Fragment 0 @@ -456,25 +456,25 @@ Fragment 3 StreamProject { exprs: [customer.c_id, customer.c_d_id, customer.c_w_id] } - └── StreamFilter { predicate: (customer.c_w_id = customer.c_w_id) AND (customer.c_d_id = customer.c_d_id) AND Like(customer.c_state, 'a%':Varchar) } + └── StreamFilter { predicate: IsNotNull(customer.c_w_id) AND IsNotNull(customer.c_d_id) AND Like(customer.c_state, 'a%':Varchar) } └── StreamTableScan { table: customer, columns: [customer.c_id, customer.c_d_id, customer.c_w_id, customer.c_state], stream_scan_type: ArrangementBackfill, pk: [customer.c_w_id, customer.c_d_id, customer.c_id], dist: UpstreamHashShard(customer.c_w_id, customer.c_d_id, customer.c_id) } { tables: [ StreamScan: 9 ] } ├── Upstream └── BatchPlanNode Fragment 4 - StreamFilter { predicate: (new_order.no_w_id = new_order.no_w_id) AND (new_order.no_d_id = new_order.no_d_id) AND (new_order.no_o_id = new_order.no_o_id) } + StreamFilter { predicate: IsNotNull(new_order.no_w_id) AND IsNotNull(new_order.no_d_id) AND IsNotNull(new_order.no_o_id) } └── StreamTableScan { table: new_order, columns: [new_order.no_o_id, new_order.no_d_id, new_order.no_w_id], stream_scan_type: ArrangementBackfill, pk: [new_order.no_w_id, new_order.no_d_id, new_order.no_o_id], dist: UpstreamHashShard(new_order.no_w_id, new_order.no_d_id, new_order.no_o_id) } { tables: [ StreamScan: 10 ] } ├── Upstream └── BatchPlanNode Fragment 5 - StreamFilter { predicate: (orders.o_d_id = orders.o_d_id) AND (orders.o_w_id = orders.o_w_id) AND (orders.o_entry_d > '2007-01-02 00:00:00':Timestamp) } + StreamFilter { predicate: IsNotNull(orders.o_d_id) AND IsNotNull(orders.o_w_id) AND (orders.o_entry_d > '2007-01-02 00:00:00':Timestamp) } └── StreamTableScan { table: orders, columns: [orders.o_id, orders.o_d_id, orders.o_w_id, orders.o_c_id, orders.o_entry_d], stream_scan_type: ArrangementBackfill, pk: [orders.o_w_id, orders.o_d_id, orders.o_id], dist: UpstreamHashShard(orders.o_w_id, orders.o_d_id, orders.o_id) } { tables: [ StreamScan: 15 ] } ├── Upstream └── BatchPlanNode Fragment 6 - StreamFilter { predicate: (order_line.ol_d_id = order_line.ol_d_id) AND (order_line.ol_w_id = order_line.ol_w_id) } + StreamFilter { predicate: IsNotNull(order_line.ol_d_id) AND IsNotNull(order_line.ol_w_id) } └── StreamTableScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_amount, order_line.ol_number], stream_scan_type: ArrangementBackfill, pk: [order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number], dist: UpstreamHashShard(order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number) } { tables: [ StreamScan: 16 ] } ├── Upstream └── BatchPlanNode @@ -683,7 +683,7 @@ │ └─StreamExchange { dist: HashShard(order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, stock.s_w_id) } │ └─StreamHashJoin { type: Inner, predicate: order_line.ol_w_id = stock.s_w_id AND order_line.ol_i_id = stock.s_i_id, output: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_amount, stock.s_i_id, stock.s_w_id, order_line.ol_number, order_line.ol_i_id] } │ ├─StreamExchange { dist: HashShard(order_line.ol_w_id, order_line.ol_i_id) } - │ │ └─StreamFilter { predicate: (order_line.ol_d_id = order_line.ol_d_id) } + │ │ └─StreamFilter { predicate: IsNotNull(order_line.ol_d_id) } │ │ └─StreamTableScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_amount, order_line.ol_number], stream_scan_type: ArrangementBackfill, pk: [order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number], dist: UpstreamHashShard(order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number) } │ └─StreamExchange { dist: HashShard(stock.s_w_id, stock.s_i_id) } │ └─StreamTableScan { table: stock, columns: [stock.s_i_id, stock.s_w_id], stream_scan_type: ArrangementBackfill, pk: [stock.s_w_id, stock.s_i_id], dist: UpstreamHashShard(stock.s_w_id, stock.s_i_id) } @@ -744,7 +744,7 @@ └── StreamExchange Hash([1, 0]) from 8 Fragment 7 - StreamFilter { predicate: (order_line.ol_d_id = order_line.ol_d_id) } + StreamFilter { predicate: IsNotNull(order_line.ol_d_id) } └── StreamTableScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_amount, order_line.ol_number], stream_scan_type: ArrangementBackfill, pk: [order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number], dist: UpstreamHashShard(order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number) } { tables: [ StreamScan: 19 ] } ├── Upstream └── BatchPlanNode @@ -989,7 +989,7 @@ │ │ │ │ └─StreamTableScan { table: stock, columns: [stock.s_i_id, stock.s_w_id], stream_scan_type: ArrangementBackfill, pk: [stock.s_w_id, stock.s_i_id], dist: UpstreamHashShard(stock.s_w_id, stock.s_i_id) } │ │ │ └─StreamExchange { dist: HashShard(order_line.ol_i_id, order_line.ol_supply_w_id) } │ │ │ └─StreamProject { exprs: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_supply_w_id, order_line.ol_amount, order_line.ol_number] } - │ │ │ └─StreamFilter { predicate: (order_line.ol_w_id = order_line.ol_w_id) AND (order_line.ol_d_id = order_line.ol_d_id) AND (order_line.ol_delivery_d >= '2007-01-02 00:00:00':Timestamp) AND (order_line.ol_delivery_d <= '2032-01-02 00:00:00':Timestamp) } + │ │ │ └─StreamFilter { predicate: IsNotNull(order_line.ol_w_id) AND IsNotNull(order_line.ol_d_id) AND (order_line.ol_delivery_d >= '2007-01-02 00:00:00':Timestamp) AND (order_line.ol_delivery_d <= '2032-01-02 00:00:00':Timestamp) } │ │ │ └─StreamTableScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_supply_w_id, order_line.ol_amount, order_line.ol_number, order_line.ol_delivery_d], stream_scan_type: ArrangementBackfill, pk: [order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number], dist: UpstreamHashShard(order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number) } │ │ └─StreamExchange { dist: HashShard(orders.o_id, orders.o_d_id, orders.o_w_id, customer.c_d_id, customer.c_w_id) } │ │ └─StreamHashJoin { type: Inner, predicate: orders.o_c_id = customer.c_id AND orders.o_w_id = customer.c_w_id AND orders.o_d_id = customer.c_d_id, output: [orders.o_id, orders.o_d_id, orders.o_w_id, orders.o_entry_d, customer.c_d_id, customer.c_w_id, customer.c_state, orders.o_c_id, customer.c_id] } @@ -1046,7 +1046,7 @@ Fragment 6 StreamProject { exprs: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_supply_w_id, order_line.ol_amount, order_line.ol_number] } - └── StreamFilter { predicate: (order_line.ol_w_id = order_line.ol_w_id) AND (order_line.ol_d_id = order_line.ol_d_id) AND (order_line.ol_delivery_d >= '2007-01-02 00:00:00':Timestamp) AND (order_line.ol_delivery_d <= '2032-01-02 00:00:00':Timestamp) } + └── StreamFilter { predicate: IsNotNull(order_line.ol_w_id) AND IsNotNull(order_line.ol_d_id) AND (order_line.ol_delivery_d >= '2007-01-02 00:00:00':Timestamp) AND (order_line.ol_delivery_d <= '2032-01-02 00:00:00':Timestamp) } └── StreamTableScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_supply_w_id, order_line.ol_amount, order_line.ol_number, order_line.ol_delivery_d], stream_scan_type: ArrangementBackfill, pk: [order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number], dist: UpstreamHashShard(order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number) } ├── tables: [ StreamScan: 18 ] ├── Upstream @@ -1195,7 +1195,7 @@ │ │ │ │ │ │ ├─LogicalJoin { type: Inner, on: (item.i_id = stock.s_i_id), output: all } │ │ │ │ │ │ │ ├─LogicalScan { table: item, columns: [item.i_id], predicate: (item.i_id < 1000:Int32) } │ │ │ │ │ │ │ └─LogicalScan { table: stock, columns: [stock.s_i_id, stock.s_w_id], predicate: (stock.s_i_id < 1000:Int32) } - │ │ │ │ │ │ └─LogicalScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_supply_w_id, order_line.ol_amount], predicate: (order_line.ol_i_id = order_line.ol_i_id) AND (order_line.ol_i_id < 1000:Int32) } + │ │ │ │ │ │ └─LogicalScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_supply_w_id, order_line.ol_amount], predicate: IsNotNull(order_line.ol_i_id) AND (order_line.ol_i_id < 1000:Int32) } │ │ │ │ │ └─LogicalScan { table: orders, columns: [orders.o_id, orders.o_d_id, orders.o_w_id, orders.o_c_id, orders.o_entry_d], predicate: (orders.o_entry_d >= '2007-01-02 00:00:00':Timestamp) AND (orders.o_entry_d <= '2032-01-02 00:00:00':Timestamp) } │ │ │ │ └─LogicalScan { table: customer, columns: [customer.c_id, customer.c_d_id, customer.c_w_id, customer.c_state] } │ │ │ └─LogicalScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey] } @@ -1232,7 +1232,7 @@ │ └─BatchFilter { predicate: (stock.s_i_id < 1000:Int32) } │ └─BatchScan { table: stock, columns: [stock.s_i_id, stock.s_w_id], distribution: UpstreamHashShard(stock.s_w_id, stock.s_i_id) } └─BatchExchange { order: [], dist: HashShard(order_line.ol_i_id, order_line.ol_supply_w_id) } - └─BatchFilter { predicate: (order_line.ol_i_id = order_line.ol_i_id) AND (order_line.ol_i_id < 1000:Int32) } + └─BatchFilter { predicate: IsNotNull(order_line.ol_i_id) AND (order_line.ol_i_id < 1000:Int32) } └─BatchScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_supply_w_id, order_line.ol_amount], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [l_year, mkt_share], stream_key: [l_year], pk_columns: [l_year], pk_conflict: NoCheck } @@ -1265,7 +1265,7 @@ │ │ │ └─StreamFilter { predicate: (stock.s_i_id < 1000:Int32) } │ │ │ └─StreamTableScan { table: stock, columns: [stock.s_i_id, stock.s_w_id], stream_scan_type: ArrangementBackfill, pk: [stock.s_w_id, stock.s_i_id], dist: UpstreamHashShard(stock.s_w_id, stock.s_i_id) } │ │ └─StreamExchange { dist: HashShard(order_line.ol_i_id, order_line.ol_supply_w_id) } - │ │ └─StreamFilter { predicate: (order_line.ol_w_id = order_line.ol_w_id) AND (order_line.ol_d_id = order_line.ol_d_id) AND (order_line.ol_i_id = order_line.ol_i_id) AND (order_line.ol_i_id < 1000:Int32) } + │ │ └─StreamFilter { predicate: IsNotNull(order_line.ol_w_id) AND IsNotNull(order_line.ol_d_id) AND IsNotNull(order_line.ol_i_id) AND (order_line.ol_i_id < 1000:Int32) } │ │ └─StreamTableScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_supply_w_id, order_line.ol_amount, order_line.ol_number], stream_scan_type: ArrangementBackfill, pk: [order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number], dist: UpstreamHashShard(order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number) } │ └─StreamExchange { dist: HashShard(supplier.s_suppkey) } │ └─StreamHashJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [supplier.s_suppkey, nation.n_name, supplier.s_nationkey, nation.n_nationkey] } @@ -1350,7 +1350,7 @@ └── BatchPlanNode Fragment 11 - StreamFilter { predicate: (order_line.ol_w_id = order_line.ol_w_id) AND (order_line.ol_d_id = order_line.ol_d_id) AND (order_line.ol_i_id = order_line.ol_i_id) AND (order_line.ol_i_id < 1000:Int32) } + StreamFilter { predicate: IsNotNull(order_line.ol_w_id) AND IsNotNull(order_line.ol_d_id) AND IsNotNull(order_line.ol_i_id) AND (order_line.ol_i_id < 1000:Int32) } └── StreamTableScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_supply_w_id, order_line.ol_amount, order_line.ol_number], stream_scan_type: ArrangementBackfill, pk: [order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number], dist: UpstreamHashShard(order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number) } { tables: [ StreamScan: 29 ] } ├── Upstream └── BatchPlanNode @@ -1558,7 +1558,7 @@ │ ├─StreamExchange { dist: HashShard(orders.o_id, orders.o_d_id, orders.o_w_id) } │ │ └─StreamTableScan { table: orders, columns: [orders.o_id, orders.o_d_id, orders.o_w_id, orders.o_entry_d], stream_scan_type: ArrangementBackfill, pk: [orders.o_w_id, orders.o_d_id, orders.o_id], dist: UpstreamHashShard(orders.o_w_id, orders.o_d_id, orders.o_id) } │ └─StreamExchange { dist: HashShard(order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id) } - │ └─StreamFilter { predicate: (order_line.ol_i_id = order_line.ol_i_id) } + │ └─StreamFilter { predicate: IsNotNull(order_line.ol_i_id) } │ └─StreamTableScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_supply_w_id, order_line.ol_amount, order_line.ol_number], stream_scan_type: ArrangementBackfill, pk: [order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number], dist: UpstreamHashShard(order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number) } └─StreamExchange { dist: HashShard(supplier.s_suppkey) } └─StreamHashJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: [supplier.s_suppkey, nation.n_name, supplier.s_nationkey, nation.n_nationkey] } @@ -1617,7 +1617,7 @@ └── BatchPlanNode Fragment 8 - StreamFilter { predicate: (order_line.ol_i_id = order_line.ol_i_id) } + StreamFilter { predicate: IsNotNull(order_line.ol_i_id) } └── StreamTableScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_supply_w_id, order_line.ol_amount, order_line.ol_number], stream_scan_type: ArrangementBackfill, pk: [order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number], dist: UpstreamHashShard(order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number) } ├── tables: [ StreamScan: 20 ] ├── Upstream @@ -1751,7 +1751,7 @@ │ └─StreamFilter { predicate: (orders.o_entry_d <= order_line.ol_delivery_d) } │ └─StreamHashJoin { type: Inner, predicate: order_line.ol_w_id = orders.o_w_id AND order_line.ol_d_id = orders.o_d_id AND order_line.ol_o_id = orders.o_id AND order_line.ol_d_id = customer.c_d_id AND order_line.ol_w_id = customer.c_w_id, output: all } │ ├─StreamExchange { dist: HashShard(order_line.ol_d_id, order_line.ol_w_id, order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id) } - │ │ └─StreamFilter { predicate: (order_line.ol_w_id = order_line.ol_w_id) AND (order_line.ol_d_id = order_line.ol_d_id) } + │ │ └─StreamFilter { predicate: IsNotNull(order_line.ol_w_id) AND IsNotNull(order_line.ol_d_id) } │ │ └─StreamTableScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_delivery_d, order_line.ol_amount, order_line.ol_number], stream_scan_type: ArrangementBackfill, pk: [order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number], dist: UpstreamHashShard(order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number) } │ └─StreamExchange { dist: HashShard(customer.c_d_id, customer.c_w_id, orders.o_id, orders.o_d_id, orders.o_w_id) } │ └─StreamHashJoin { type: Inner, predicate: customer.c_id = orders.o_c_id AND customer.c_w_id = orders.o_w_id AND customer.c_d_id = orders.o_d_id, output: [customer.c_id, customer.c_d_id, customer.c_w_id, customer.c_last, customer.c_city, customer.c_state, customer.c_phone, orders.o_id, orders.o_d_id, orders.o_w_id, orders.o_entry_d] } @@ -1785,7 +1785,7 @@ └── StreamExchange Hash([1, 2, 7, 8, 9]) from 4 Fragment 3 - StreamFilter { predicate: (order_line.ol_w_id = order_line.ol_w_id) AND (order_line.ol_d_id = order_line.ol_d_id) } + StreamFilter { predicate: IsNotNull(order_line.ol_w_id) AND IsNotNull(order_line.ol_d_id) } └── StreamTableScan { table: order_line, columns: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_delivery_d, order_line.ol_amount, order_line.ol_number], stream_scan_type: ArrangementBackfill, pk: [order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number], dist: UpstreamHashShard(order_line.ol_w_id, order_line.ol_d_id, order_line.ol_o_id, order_line.ol_number) } ├── tables: [ StreamScan: 9 ] ├── Upstream @@ -3204,7 +3204,7 @@ │ │ └─StreamProject { exprs: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_delivery_d, ((stock.s_w_id * stock.s_i_id) % 10000:Int32)::Int64 as $expr1, stock.s_w_id, stock.s_i_id, order_line.ol_number] } │ │ └─StreamHashJoin { type: Inner, predicate: stock.s_w_id = order_line.ol_w_id AND stock.s_i_id = order_line.ol_i_id AND stock.s_w_id = orders.o_w_id, output: [stock.s_i_id, stock.s_w_id, order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_delivery_d, order_line.ol_number] } │ │ ├─StreamExchange { dist: HashShard(stock.s_w_id, stock.s_i_id, stock.s_w_id) } - │ │ │ └─StreamFilter { predicate: (stock.s_w_id = stock.s_w_id) } + │ │ │ └─StreamFilter { predicate: IsNotNull(stock.s_w_id) } │ │ │ └─StreamTableScan { table: stock, columns: [stock.s_i_id, stock.s_w_id], stream_scan_type: ArrangementBackfill, pk: [stock.s_w_id, stock.s_i_id], dist: UpstreamHashShard(stock.s_w_id, stock.s_i_id) } │ │ └─StreamExchange { dist: HashShard(order_line.ol_w_id, order_line.ol_i_id, orders.o_w_id) } │ │ └─StreamProject { exprs: [order_line.ol_o_id, order_line.ol_d_id, order_line.ol_w_id, order_line.ol_i_id, order_line.ol_delivery_d, orders.o_w_id, order_line.ol_number, orders.o_d_id, orders.o_id] } @@ -3250,7 +3250,7 @@ └── StreamExchange Hash([2, 3, 5]) from 5 Fragment 4 - StreamFilter { predicate: (stock.s_w_id = stock.s_w_id) } + StreamFilter { predicate: IsNotNull(stock.s_w_id) } └── StreamTableScan { table: stock, columns: [stock.s_i_id, stock.s_w_id], stream_scan_type: ArrangementBackfill, pk: [stock.s_w_id, stock.s_i_id], dist: UpstreamHashShard(stock.s_w_id, stock.s_i_id) } { tables: [ StreamScan: 13 ] } ├── Upstream └── BatchPlanNode diff --git a/src/frontend/planner_test/tests/testdata/output/tpch.yaml b/src/frontend/planner_test/tests/testdata/output/tpch.yaml index f45f6623b00ec..8e350d4bcffed 100644 --- a/src/frontend/planner_test/tests/testdata/output/tpch.yaml +++ b/src/frontend/planner_test/tests/testdata/output/tpch.yaml @@ -356,7 +356,7 @@ │ └─StreamExchange { dist: HashShard(supplier.s_nationkey) } │ └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_nationkey, supplier.s_phone, supplier.s_acctbal, supplier.s_comment], stream_scan_type: ArrangementBackfill, pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } └─StreamExchange { dist: HashShard(partsupp.ps_suppkey) } - └─StreamFilter { predicate: (partsupp.ps_partkey = partsupp.ps_partkey) } + └─StreamFilter { predicate: IsNotNull(partsupp.ps_partkey) } └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost], stream_scan_type: ArrangementBackfill, pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } stream_dist_plan: |+ Fragment 0 @@ -457,7 +457,7 @@ └── BatchPlanNode Fragment 17 - StreamFilter { predicate: (partsupp.ps_partkey = partsupp.ps_partkey) } + StreamFilter { predicate: IsNotNull(partsupp.ps_partkey) } └── StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost], stream_scan_type: ArrangementBackfill, pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } { tables: [ StreamScan: 44 ] } ├── Upstream └── BatchPlanNode @@ -963,7 +963,7 @@ │ │ └─StreamFilter { predicate: (region.r_name = 'MIDDLE EAST':Varchar) } │ │ └─StreamTableScan { table: region, columns: [region.r_regionkey, region.r_name], stream_scan_type: ArrangementBackfill, pk: [region.r_regionkey], dist: UpstreamHashShard(region.r_regionkey) } │ └─StreamExchange { dist: HashShard(nation.n_regionkey) } - │ └─StreamFilter { predicate: (nation.n_nationkey = nation.n_nationkey) } + │ └─StreamFilter { predicate: IsNotNull(nation.n_nationkey) } │ └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name, nation.n_regionkey], stream_scan_type: ArrangementBackfill, pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } └─StreamExchange { dist: HashShard(customer.c_nationkey, supplier.s_nationkey) } └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey AND customer.c_nationkey = supplier.s_nationkey, output: [customer.c_nationkey, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey, orders.o_orderkey, orders.o_custkey, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_suppkey] } @@ -1017,7 +1017,7 @@ └── BatchPlanNode Fragment 5 - StreamFilter { predicate: (nation.n_nationkey = nation.n_nationkey) } + StreamFilter { predicate: IsNotNull(nation.n_nationkey) } └── StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name, nation.n_regionkey], stream_scan_type: ArrangementBackfill, pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } { tables: [ StreamScan: 12 ] } ├── Upstream └── BatchPlanNode @@ -1907,7 +1907,7 @@ │ │ └─StreamFilter { predicate: Like(part.p_name, '%yellow%':Varchar) } │ │ └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_name], stream_scan_type: ArrangementBackfill, pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } │ └─StreamExchange { dist: HashShard(partsupp.ps_partkey) } - │ └─StreamFilter { predicate: (partsupp.ps_suppkey = partsupp.ps_suppkey) } + │ └─StreamFilter { predicate: IsNotNull(partsupp.ps_suppkey) } │ └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost], stream_scan_type: ArrangementBackfill, pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } └─StreamHashJoin { type: Inner, predicate: supplier.s_suppkey = lineitem.l_suppkey, output: [nation.n_name, supplier.s_suppkey, orders.o_orderdate, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, nation.n_nationkey, orders.o_orderkey, lineitem.l_linenumber] } ├─StreamExchange { dist: HashShard(supplier.s_suppkey) } @@ -1921,7 +1921,7 @@ ├─StreamExchange { dist: HashShard(orders.o_orderkey) } │ └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_orderdate], stream_scan_type: ArrangementBackfill, pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - └─StreamFilter { predicate: (lineitem.l_partkey = lineitem.l_partkey) } + └─StreamFilter { predicate: IsNotNull(lineitem.l_partkey) } └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber], stream_scan_type: ArrangementBackfill, pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } stream_dist_plan: |+ Fragment 0 @@ -1960,7 +1960,7 @@ └── BatchPlanNode Fragment 5 - StreamFilter { predicate: (partsupp.ps_suppkey = partsupp.ps_suppkey) } + StreamFilter { predicate: IsNotNull(partsupp.ps_suppkey) } └── StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost], stream_scan_type: ArrangementBackfill, pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } { tables: [ StreamScan: 12 ] } ├── Upstream └── BatchPlanNode @@ -1991,7 +1991,7 @@ └── BatchPlanNode Fragment 11 - StreamFilter { predicate: (lineitem.l_partkey = lineitem.l_partkey) } + StreamFilter { predicate: IsNotNull(lineitem.l_partkey) } └── StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber], stream_scan_type: ArrangementBackfill, pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } { tables: [ StreamScan: 28 ] } ├── Upstream └── BatchPlanNode diff --git a/src/frontend/planner_test/tests/testdata/output/tpch_variant.yaml b/src/frontend/planner_test/tests/testdata/output/tpch_variant.yaml index 00f96df8f7aa8..89e884f962235 100644 --- a/src/frontend/planner_test/tests/testdata/output/tpch_variant.yaml +++ b/src/frontend/planner_test/tests/testdata/output/tpch_variant.yaml @@ -266,7 +266,7 @@ │ └─StreamRowIdGen { row_id_index: 7 } │ └─StreamSource { source: supplier, columns: [s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, _row_id] } └─StreamExchange { dist: HashShard(ps_suppkey) } - └─StreamFilter { predicate: (ps_partkey = ps_partkey) } + └─StreamFilter { predicate: IsNotNull(ps_partkey) } └─StreamShare { id: 5 } └─StreamProject { exprs: [ps_partkey, ps_suppkey, ps_supplycost, _row_id] } └─StreamRowIdGen { row_id_index: 5 } @@ -372,7 +372,7 @@ └── StreamExchange NoShuffle from 9 Fragment 21 - StreamFilter { predicate: (ps_partkey = ps_partkey) } + StreamFilter { predicate: IsNotNull(ps_partkey) } └── StreamExchange NoShuffle from 7 Table 0 { columns: [ p_partkey, p_mfgr, ps_partkey, min(ps_supplycost), _row_id ], primary key: [ $0 ASC, $3 ASC, $2 ASC, $4 ASC ], value indices: [ 0, 1, 2, 3, 4 ], distribution key: [ 2, 3 ], read pk prefix len hint: 3 } @@ -548,7 +548,7 @@ │ │ └─StreamRowIdGen { row_id_index: 3 } │ │ └─StreamSource { source: region, columns: [r_regionkey, r_name, r_comment, _row_id] } │ └─StreamExchange { dist: HashShard(n_regionkey) } - │ └─StreamFilter { predicate: (n_nationkey = n_nationkey) } + │ └─StreamFilter { predicate: IsNotNull(n_nationkey) } │ └─StreamRowIdGen { row_id_index: 4 } │ └─StreamSource { source: nation, columns: [n_nationkey, n_name, n_regionkey, n_comment, _row_id] } └─StreamExchange { dist: HashShard(c_nationkey, s_nationkey) } @@ -594,7 +594,7 @@ └── StreamSource { source: region, columns: [r_regionkey, r_name, r_comment, _row_id] } { tables: [ Source: 9 ] } Fragment 4 - StreamFilter { predicate: (n_nationkey = n_nationkey) } + StreamFilter { predicate: IsNotNull(n_nationkey) } └── StreamRowIdGen { row_id_index: 4 } └── StreamSource { source: nation, columns: [n_nationkey, n_name, n_regionkey, n_comment, _row_id] } { tables: [ Source: 10 ] } @@ -1421,7 +1421,7 @@ │ │ └─StreamRowIdGen { row_id_index: 9 } │ │ └─StreamSource { source: part, columns: [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, _row_id] } │ └─StreamExchange { dist: HashShard(ps_partkey) } - │ └─StreamFilter { predicate: (ps_suppkey = ps_suppkey) } + │ └─StreamFilter { predicate: IsNotNull(ps_suppkey) } │ └─StreamRowIdGen { row_id_index: 5 } │ └─StreamSource { source: partsupp, columns: [ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment, _row_id] } └─StreamHashJoin [append_only] { type: Inner, predicate: s_suppkey = l_suppkey, output: [n_name, s_suppkey, o_orderdate, l_partkey, l_suppkey, l_quantity, l_extendedprice, l_discount, _row_id, _row_id, n_nationkey, _row_id, _row_id, o_orderkey] } @@ -1439,7 +1439,7 @@ │ └─StreamRowIdGen { row_id_index: 9 } │ └─StreamSource { source: orders, columns: [o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, _row_id] } └─StreamExchange { dist: HashShard(l_orderkey) } - └─StreamFilter { predicate: (l_partkey = l_partkey) } + └─StreamFilter { predicate: IsNotNull(l_partkey) } └─StreamRowIdGen { row_id_index: 16 } └─StreamSource { source: lineitem, columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _row_id] } stream_dist_plan: |+ @@ -1469,7 +1469,7 @@ └── StreamSource { source: part, columns: [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, _row_id] } { tables: [ Source: 9 ] } Fragment 4 - StreamFilter { predicate: (ps_suppkey = ps_suppkey) } + StreamFilter { predicate: IsNotNull(ps_suppkey) } └── StreamRowIdGen { row_id_index: 5 } └── StreamSource { source: partsupp, columns: [ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment, _row_id] } { tables: [ Source: 10 ] } @@ -1496,7 +1496,7 @@ └── StreamSource { source: orders, columns: [o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, _row_id] } { tables: [ Source: 25 ] } Fragment 10 - StreamFilter { predicate: (l_partkey = l_partkey) } + StreamFilter { predicate: IsNotNull(l_partkey) } └── StreamRowIdGen { row_id_index: 16 } └── StreamSource { source: lineitem, columns: [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, _row_id] } { tables: [ Source: 26 ] } diff --git a/src/frontend/src/expr/utils.rs b/src/frontend/src/expr/utils.rs index 9db25b3dc554e..54d0521b3f8ef 100644 --- a/src/frontend/src/expr/utils.rs +++ b/src/frontend/src/expr/utils.rs @@ -69,6 +69,85 @@ pub fn fold_boolean_constant(expr: ExprImpl) -> ExprImpl { rewriter.rewrite_expr(expr) } +/// check `ColumnSelfEqualRewriter`'s comment below. +pub fn column_self_eq_eliminate(expr: ExprImpl) -> ExprImpl { + ColumnSelfEqualRewriter::rewrite(expr) +} + +/// for every `(col) == (col)`, +/// transform to `IsNotNull(col)` +/// since in the boolean context, `null = (...)` will always +/// be treated as false. +/// note: as always, only for *single column*. +pub struct ColumnSelfEqualRewriter {} + +impl ColumnSelfEqualRewriter { + /// the exact copy from `logical_filter_expression_simplify_rule` + fn extract_column(expr: ExprImpl, columns: &mut Vec) { + match expr.clone() { + ExprImpl::FunctionCall(func_call) => { + // the functions that *never* return null will be ignored + if Self::is_not_null(func_call.func_type()) { + return; + } + for sub_expr in func_call.inputs() { + Self::extract_column(sub_expr.clone(), columns); + } + } + ExprImpl::InputRef(_) => { + if !columns.contains(&expr) { + // only add the column if not exists + columns.push(expr); + } + } + _ => (), + } + } + + /// the exact copy from `logical_filter_expression_simplify_rule` + fn is_not_null(func_type: ExprType) -> bool { + func_type == ExprType::IsNull + || func_type == ExprType::IsNotNull + || func_type == ExprType::IsTrue + || func_type == ExprType::IsFalse + || func_type == ExprType::IsNotTrue + || func_type == ExprType::IsNotFalse + } + + pub fn rewrite(expr: ExprImpl) -> ExprImpl { + let mut columns = vec![]; + Self::extract_column(expr.clone(), &mut columns); + if columns.len() > 1 { + // leave it intact + return expr; + } + + // extract the equal inputs with sanity check + let ExprImpl::FunctionCall(func_call) = expr.clone() else { + return expr; + }; + if func_call.func_type() != ExprType::Equal || func_call.inputs().len() != 2 { + return expr; + } + assert_eq!(func_call.return_type(), DataType::Boolean); + let inputs = func_call.inputs(); + let e1 = inputs[0].clone(); + let e2 = inputs[1].clone(); + + if e1 == e2 { + if columns.is_empty() { + return ExprImpl::literal_bool(true); + } + let Ok(ret) = FunctionCall::new(ExprType::IsNotNull, vec![columns[0].clone()]) else { + return expr; + }; + ret.into() + } else { + expr + } + } +} + /// Fold boolean constants in a expr struct BooleanConstantFolding {} diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 5c5f70e99a8c6..2ab2c8de8f747 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -466,7 +466,7 @@ impl PlanRef { for c in merge_predicate.conjunctions { let c = Condition::with_expr(expr_rewriter.rewrite_cond(c)); - + // rebuild the conjunctions new_predicate = new_predicate.and(c); } diff --git a/src/frontend/src/optimizer/rule/logical_filter_expression_simplify_rule.rs b/src/frontend/src/optimizer/rule/logical_filter_expression_simplify_rule.rs index 271f796aae629..ebc724c200481 100644 --- a/src/frontend/src/optimizer/rule/logical_filter_expression_simplify_rule.rs +++ b/src/frontend/src/optimizer/rule/logical_filter_expression_simplify_rule.rs @@ -42,6 +42,8 @@ impl Rule for LogicalFilterExpressionSimplifyRule { // i.e., the specific optimization that will apply to the entire condition // e.g., `e and not(e)` let predicate = ConditionRewriter::rewrite(filter.predicate().clone()); + + // construct the new filter after predicate rewriting let filter = LogicalFilter::create(filter.input(), predicate); // then rewrite single expression via `rewrite_exprs` diff --git a/src/frontend/src/utils/condition.rs b/src/frontend/src/utils/condition.rs index b1db9d7846c49..3d856939f72eb 100644 --- a/src/frontend/src/utils/condition.rs +++ b/src/frontend/src/utils/condition.rs @@ -26,9 +26,9 @@ use risingwave_common::util::scan_range::{is_full_range, ScanRange}; use crate::error::Result; use crate::expr::{ - collect_input_refs, factorization_expr, fold_boolean_constant, push_down_not, to_conjunctions, - try_get_bool_constant, ExprDisplay, ExprImpl, ExprMutator, ExprRewriter, ExprType, ExprVisitor, - FunctionCall, InequalityInputPair, InputRef, + collect_input_refs, column_self_eq_eliminate, factorization_expr, fold_boolean_constant, + push_down_not, to_conjunctions, try_get_bool_constant, ExprDisplay, ExprImpl, ExprMutator, + ExprRewriter, ExprType, ExprVisitor, FunctionCall, InequalityInputPair, InputRef, }; use crate::utils::condition::cast_compare::{ResultForCmp, ResultForEq}; @@ -849,6 +849,7 @@ impl Condition { .into_iter() .map(push_down_not) .map(fold_boolean_constant) + .map(column_self_eq_eliminate) .flat_map(to_conjunctions) .collect(); let mut res: Vec = Vec::new(); @@ -1030,8 +1031,9 @@ mod tests { let ty = DataType::Int32; let mut rng = rand::thread_rng(); + let left: ExprImpl = FunctionCall::new( - ExprType::Equal, + ExprType::LessThanOrEqual, vec![ InputRef::new(rng.gen_range(0..left_col_num), ty.clone()).into(), InputRef::new(rng.gen_range(0..left_col_num), ty.clone()).into(), @@ -1039,6 +1041,7 @@ mod tests { ) .unwrap() .into(); + let right: ExprImpl = FunctionCall::new( ExprType::LessThan, vec![ @@ -1056,6 +1059,7 @@ mod tests { ) .unwrap() .into(); + let other: ExprImpl = FunctionCall::new( ExprType::GreaterThan, vec![ @@ -1073,7 +1077,9 @@ mod tests { let cond = Condition::with_expr(other.clone()) .and(Condition::with_expr(right.clone())) .and(Condition::with_expr(left.clone())); + let res = cond.split(left_col_num, right_col_num); + assert_eq!(res.0.conjunctions, vec![left]); assert_eq!(res.1.conjunctions, vec![right]); assert_eq!(res.2.conjunctions, vec![other]);