diff --git a/Cargo.lock b/Cargo.lock index 53fec8155e20e..ede6a6fd9bdd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13555,9 +13555,9 @@ dependencies = [ [[package]] name = "sqllogictest" -version = "0.20.4" +version = "0.20.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f20de090b0fde4dcd53b330e7ad2772140304eb34f5e9a99c8963fc4c052f149" +checksum = "ab17edbf3a80a891d1a9650c0ceca0c0e8931d8e087a22d04d7645aed5fbb86a" dependencies = [ "async-trait", "educe", diff --git a/e2e_test/streaming/bug_fixes/stack_overflow_17342.slt b/e2e_test/streaming/bug_fixes/stack_overflow_17342.slt new file mode 100644 index 0000000000000..01197a299736f --- /dev/null +++ b/e2e_test/streaming/bug_fixes/stack_overflow_17342.slt @@ -0,0 +1,120 @@ +statement ok +SET streaming_parallelism TO 1; + +statement ok +CREATE TABLE t (v int); + +# This query used to overflow the stack during optimization as it generated a left-deep tree +# of `OR xx IS NOT NULL` expression in the filter after each full outer join. +skipif madsim +statement ok +CREATE MATERIALIZED VIEW mv AS +SELECT + count(*) +FROM + t +FULL OUTER JOIN t t1 USING (v) +FULL OUTER JOIN t t2 USING (v) +FULL OUTER JOIN t t3 USING (v) +FULL OUTER JOIN t t4 USING (v) +FULL OUTER JOIN t t5 USING (v) +FULL OUTER JOIN t t6 USING (v) +FULL OUTER JOIN t t7 USING (v) +FULL OUTER JOIN t t8 USING (v) +FULL OUTER JOIN t t9 USING (v) +FULL OUTER JOIN t t10 USING (v) +FULL OUTER JOIN t t11 USING (v) +FULL OUTER JOIN t t12 USING (v) +FULL OUTER JOIN t t13 USING (v) +FULL OUTER JOIN t t14 USING (v) +FULL OUTER JOIN t t15 USING (v) +FULL OUTER JOIN t t16 USING (v) +FULL OUTER JOIN t t17 USING (v) +FULL OUTER JOIN t t18 USING (v) +FULL OUTER JOIN t t19 USING (v) +FULL OUTER JOIN t t20 USING (v) +FULL OUTER JOIN t t21 USING (v) +FULL OUTER JOIN t t22 USING (v) +FULL OUTER JOIN t t23 USING (v) +FULL OUTER JOIN t t24 USING (v) +FULL OUTER JOIN t t25 USING (v) +FULL OUTER JOIN t t26 USING (v) +FULL OUTER JOIN t t27 USING (v) +FULL OUTER JOIN t t28 USING (v) +FULL OUTER JOIN t t29 USING (v) +FULL OUTER JOIN t t30 USING (v) +FULL OUTER JOIN t t31 USING (v) +FULL OUTER JOIN t t32 USING (v) +FULL OUTER JOIN t t33 USING (v) +FULL OUTER JOIN t t34 USING (v) +FULL OUTER JOIN t t35 USING (v) +FULL OUTER JOIN t t36 USING (v) +FULL OUTER JOIN t t37 USING (v) +FULL OUTER JOIN t t38 USING (v) +FULL OUTER JOIN t t39 USING (v) +FULL OUTER JOIN t t40 USING (v) +FULL OUTER JOIN t t41 USING (v) +FULL OUTER JOIN t t42 USING (v) +FULL OUTER JOIN t t43 USING (v) +FULL OUTER JOIN t t44 USING (v) +FULL OUTER JOIN t t45 USING (v) +FULL OUTER JOIN t t46 USING (v) +FULL OUTER JOIN t t47 USING (v) +FULL OUTER JOIN t t48 USING (v) +FULL OUTER JOIN t t49 USING (v) +FULL OUTER JOIN t t50 USING (v) +FULL OUTER JOIN t t51 USING (v) +FULL OUTER JOIN t t52 USING (v) +FULL OUTER JOIN t t53 USING (v) +FULL OUTER JOIN t t54 USING (v) +FULL OUTER JOIN t t55 USING (v) +FULL OUTER JOIN t t56 USING (v) +FULL OUTER JOIN t t57 USING (v) +FULL OUTER JOIN t t58 USING (v) +FULL OUTER JOIN t t59 USING (v) +FULL OUTER JOIN t t60 USING (v) +FULL OUTER JOIN t t61 USING (v) +FULL OUTER JOIN t t62 USING (v) +FULL OUTER JOIN t t63 USING (v) +FULL OUTER JOIN t t64 USING (v) +FULL OUTER JOIN t t65 USING (v) +FULL OUTER JOIN t t66 USING (v) +FULL OUTER JOIN t t67 USING (v) +FULL OUTER JOIN t t68 USING (v) +FULL OUTER JOIN t t69 USING (v) +FULL OUTER JOIN t t70 USING (v) +FULL OUTER JOIN t t71 USING (v) +FULL OUTER JOIN t t72 USING (v) +FULL OUTER JOIN t t73 USING (v) +FULL OUTER JOIN t t74 USING (v) +FULL OUTER JOIN t t75 USING (v) +FULL OUTER JOIN t t76 USING (v) +FULL OUTER JOIN t t77 USING (v) +FULL OUTER JOIN t t78 USING (v) +FULL OUTER JOIN t t79 USING (v) +FULL OUTER JOIN t t80 USING (v) +FULL OUTER JOIN t t81 USING (v) +FULL OUTER JOIN t t82 USING (v) +FULL OUTER JOIN t t83 USING (v) +FULL OUTER JOIN t t84 USING (v) +FULL OUTER JOIN t t85 USING (v) +FULL OUTER JOIN t t86 USING (v) +FULL OUTER JOIN t t87 USING (v) +FULL OUTER JOIN t t88 USING (v) +FULL OUTER JOIN t t89 USING (v) +FULL OUTER JOIN t t90 USING (v) +FULL OUTER JOIN t t91 USING (v) +FULL OUTER JOIN t t92 USING (v) +FULL OUTER JOIN t t93 USING (v) +FULL OUTER JOIN t t94 USING (v) +FULL OUTER JOIN t t95 USING (v) +FULL OUTER JOIN t t96 USING (v) +FULL OUTER JOIN t t97 USING (v) +FULL OUTER JOIN t t98 USING (v) +; + +statement ok +DROP TABLE t CASCADE; + +statement ok +SET streaming_parallelism TO DEFAULT; diff --git a/src/frontend/planner_test/tests/testdata/output/index_selection.yaml b/src/frontend/planner_test/tests/testdata/output/index_selection.yaml index 82c2a5bbf7ec6..67a89e0878f9b 100644 --- a/src/frontend/planner_test/tests/testdata/output/index_selection.yaml +++ b/src/frontend/planner_test/tests/testdata/output/index_selection.yaml @@ -332,7 +332,7 @@ select * from t1 where a = 1 or b = 2 or c = 3 or p = 4 or a = 5 batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR (t1.c = 3:Int32)) OR (t1.p = 4:Int32)) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } + └─BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR ((t1.c = 3:Int32) OR (t1.p = 4:Int32))) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx1.t1._row_id) } └─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] } └─BatchExchange { order: [], dist: HashShard(idx1.t1._row_id) } @@ -346,7 +346,7 @@ └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx4, columns: [idx4.t1._row_id], scan_ranges: [idx4.p = Int32(4)], distribution: SomeShard } batch_local_plan: |- - BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND (((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR (t1.c = 3:Int32)) OR (t1.p = 4:Int32)) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } + BatchLookupJoin { type: Inner, predicate: idx1.t1._row_id IS NOT DISTINCT FROM t1._row_id AND ((((t1.a = 1:Int32) OR (t1.b = 2:Decimal)) OR ((t1.c = 3:Int32) OR (t1.p = 4:Int32))) OR (t1.a = 5:Int32)), output: [t1.a, t1.b, t1.c, t1.p], lookup table: t1 } └─BatchHashAgg { group_key: [idx1.t1._row_id], aggs: [] } └─BatchUnion { all: true } ├─BatchExchange { order: [], dist: Single } diff --git a/src/frontend/planner_test/tests/testdata/output/join.yaml b/src/frontend/planner_test/tests/testdata/output/join.yaml index 2db3b8cc3994a..a1617d04e7d2e 100644 --- a/src/frontend/planner_test/tests/testdata/output/join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/join.yaml @@ -204,7 +204,7 @@ StreamMaterialize { columns: [x, i.t._row_id(hidden), i.t._row_id#1(hidden), i.x(hidden), i.t._row_id#2(hidden), i.t._row_id#3(hidden), i.x#1(hidden)], stream_key: [i.t._row_id, i.t._row_id#1, i.x, i.t._row_id#2, i.t._row_id#3, i.x#1], pk_columns: [i.t._row_id, i.t._row_id#1, i.x, i.t._row_id#2, i.t._row_id#3, i.x#1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x) } └─StreamProject { exprs: [Coalesce(i.x, i.x) as $expr1, i.t._row_id, i.t._row_id, i.x, i.t._row_id, i.t._row_id, i.x] } - └─StreamFilter { predicate: (((((IsNotNull(i.t._row_id) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.x)) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.t._row_id)) OR IsNotNull(i.x)) } + └─StreamFilter { predicate: (((IsNotNull(i.t._row_id) OR IsNotNull(i.t._row_id)) OR (IsNotNull(i.x) OR IsNotNull(i.t._row_id))) OR (IsNotNull(i.t._row_id) OR IsNotNull(i.x))) } └─StreamHashJoin { type: FullOuter, predicate: i.x = i.x, output: [i.x, i.x, i.t._row_id, i.t._row_id, i.t._row_id, i.t._row_id] } ├─StreamShare { id: 4 } │ └─StreamHashJoin { type: Inner, predicate: i.x = i.x, output: [i.x, i.t._row_id, i.t._row_id] } diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index 298653450f659..d6b90da0a8c1a 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -107,20 +107,20 @@ sql: SELECT auction, price FROM bid WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } + └─BatchFilter { predicate: ((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR ((bid.auction = 2001:Int32) OR (bid.auction = 2019:Int32))) OR (bid.auction = 2087:Int32)) } └─BatchScan { table: bid, columns: [bid.auction, bid.price], distribution: SomeShard } sink_plan: |- StreamSink { type: append-only, columns: [auction, price, bid._row_id(hidden)] } - └─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } + └─StreamFilter { predicate: ((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR ((bid.auction = 2001:Int32) OR (bid.auction = 2019:Int32))) OR (bid.auction = 2087:Int32)) } └─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) } stream_plan: |- StreamMaterialize { columns: [auction, price, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: NoCheck } - └─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } + └─StreamFilter { predicate: ((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR ((bid.auction = 2001:Int32) OR (bid.auction = 2019:Int32))) OR (bid.auction = 2087:Int32)) } └─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [auction, price, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } - └── StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) } + └── StreamFilter { predicate: ((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR ((bid.auction = 2001:Int32) OR (bid.auction = 2019:Int32))) OR (bid.auction = 2087:Int32)) } └── StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) } ├── tables: [ StreamScan: 0 ] ├── Upstream diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml index 823fa85459df7..35713c9682a35 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml @@ -116,13 +116,13 @@ sql: SELECT auction, price FROM bid WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087; batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchFilter { predicate: (((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR (auction = 2001:Int32)) OR (auction = 2019:Int32)) OR (auction = 2087:Int32)) } + └─BatchFilter { predicate: ((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR ((auction = 2001:Int32) OR (auction = 2019:Int32))) OR (auction = 2087:Int32)) } └─BatchProject { exprs: [auction, price] } └─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } stream_plan: |- StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } └─StreamProject { exprs: [auction, price, _row_id] } - └─StreamFilter { predicate: (((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR (auction = 2001:Int32)) OR (auction = 2019:Int32)) OR (auction = 2087:Int32)) } + └─StreamFilter { predicate: ((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR ((auction = 2001:Int32) OR (auction = 2019:Int32))) OR (auction = 2087:Int32)) } └─StreamRowIdGen { row_id_index: 7 } └─StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } stream_dist_plan: |+ @@ -130,7 +130,7 @@ StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] └── StreamProject { exprs: [auction, price, _row_id] } - └── StreamFilter { predicate: (((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR (auction = 2001:Int32)) OR (auction = 2019:Int32)) OR (auction = 2087:Int32)) } + └── StreamFilter { predicate: ((((auction = 1007:Int32) OR (auction = 1020:Int32)) OR ((auction = 2001:Int32) OR (auction = 2019:Int32))) OR (auction = 2087:Int32)) } └── StreamRowIdGen { row_id_index: 7 } └── StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } { tables: [ Source: 0 ] } diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark_temporal_filter.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark_temporal_filter.yaml index f77e975780c8a..d5d948e5b507c 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark_temporal_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark_temporal_filter.yaml @@ -116,7 +116,7 @@ StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr3, Field(bid, 2:Int32) as $expr4, _row_id] } └─StreamDynamicFilter { predicate: ($expr1 > $expr2), output_watermarks: [$expr1], output: [event_type, person, auction, bid, $expr1, _row_id], cleaned_by_watermark: true } - ├─StreamFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) } + ├─StreamFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) } │ └─StreamRowIdGen { row_id_index: 5 } │ └─StreamProject { exprs: [event_type, person, auction, bid, Proctime as $expr1, _row_id], output_watermarks: [$expr1] } │ └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] } @@ -128,7 +128,7 @@ StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } └── StreamProject { exprs: [Field(bid, 0:Int32) as $expr3, Field(bid, 2:Int32) as $expr4, _row_id] } └── StreamDynamicFilter { predicate: ($expr1 > $expr2), output_watermarks: [$expr1], output: [event_type, person, auction, bid, $expr1, _row_id], cleaned_by_watermark: true } { tables: [ DynamicFilterLeft: 0, DynamicFilterRight: 1 ] } - ├── StreamFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) } + ├── StreamFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) } │ └── StreamRowIdGen { row_id_index: 5 } │ └── StreamProject { exprs: [event_type, person, auction, bid, Proctime as $expr1, _row_id], output_watermarks: [$expr1] } │ └── StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] } { tables: [ Source: 2 ] } diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml index 6dd731cffffb0..f065ba33c252d 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml @@ -77,13 +77,13 @@ batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [Field(bid, 0:Int32) as $expr2, Field(bid, 2:Int32) as $expr3] } - └─BatchFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) } + └─BatchFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) } └─BatchProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] } └─BatchSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] } stream_plan: |- StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr2, Field(bid, 2:Int32) as $expr3, _row_id] } - └─StreamFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) } + └─StreamFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) } └─StreamRowIdGen { row_id_index: 5 } └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] } └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] } @@ -92,7 +92,7 @@ Fragment 0 StreamMaterialize { columns: [auction, price, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } └── StreamProject { exprs: [Field(bid, 0:Int32) as $expr2, Field(bid, 2:Int32) as $expr3, _row_id] } - └── StreamFilter { predicate: (((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR (Field(bid, 0:Int32) = 2001:Int32)) OR (Field(bid, 0:Int32) = 2019:Int32)) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) } + └── StreamFilter { predicate: ((((Field(bid, 0:Int32) = 1007:Int32) OR (Field(bid, 0:Int32) = 1020:Int32)) OR ((Field(bid, 0:Int32) = 2001:Int32) OR (Field(bid, 0:Int32) = 2019:Int32))) OR (Field(bid, 0:Int32) = 2087:Int32)) AND (event_type = 2:Int32) } └── StreamRowIdGen { row_id_index: 5 } └── StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] } { tables: [ WatermarkFilter: 0 ] } └── StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id] } diff --git a/src/frontend/planner_test/tests/testdata/output/tpch.yaml b/src/frontend/planner_test/tests/testdata/output/tpch.yaml index dddddff210409..3c43faa8d2494 100644 --- a/src/frontend/planner_test/tests/testdata/output/tpch.yaml +++ b/src/frontend/planner_test/tests/testdata/output/tpch.yaml @@ -3650,14 +3650,14 @@ LogicalProject { exprs: [sum($expr1)] } └─LogicalAgg { aggs: [sum($expr1)] } └─LogicalProject { exprs: [(lineitem.l_extendedprice * (1:Int32::Decimal - lineitem.l_discount)) as $expr1] } - └─LogicalFilter { predicate: (part.p_partkey = lineitem.l_partkey) AND (part.p_size >= 1:Int32) AND In(lineitem.l_shipmode, 'AIR':Varchar, 'AIR REG':Varchar) AND (lineitem.l_shipinstruct = 'DELIVER IN PERSON':Varchar) AND (((((((part.p_brand = 'Brand#52':Varchar) AND In(part.p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND (lineitem.l_quantity >= 1:Int32::Decimal)) AND (lineitem.l_quantity <= 11:Int32::Decimal)) AND (part.p_size <= 5:Int32)) OR (((((part.p_brand = 'Brand#24':Varchar) AND In(part.p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND (lineitem.l_quantity >= 30:Int32::Decimal)) AND (lineitem.l_quantity <= 40:Int32::Decimal)) AND (part.p_size <= 10:Int32))) OR (((((part.p_brand = 'Brand#32':Varchar) AND In(part.p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND (lineitem.l_quantity >= 10:Int32::Decimal)) AND (lineitem.l_quantity <= 20:Int32::Decimal)) AND (part.p_size <= 15:Int32))) } + └─LogicalFilter { predicate: (part.p_partkey = lineitem.l_partkey) AND (part.p_size >= 1:Int32) AND In(lineitem.l_shipmode, 'AIR':Varchar, 'AIR REG':Varchar) AND (lineitem.l_shipinstruct = 'DELIVER IN PERSON':Varchar) AND ((((((part.p_brand = 'Brand#52':Varchar) AND In(part.p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND ((lineitem.l_quantity >= 1:Int32::Decimal) AND (lineitem.l_quantity <= 11:Int32::Decimal))) AND (part.p_size <= 5:Int32)) OR ((((part.p_brand = 'Brand#24':Varchar) AND In(part.p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND ((lineitem.l_quantity >= 30:Int32::Decimal) AND (lineitem.l_quantity <= 40:Int32::Decimal))) AND (part.p_size <= 10:Int32))) OR ((((part.p_brand = 'Brand#32':Varchar) AND In(part.p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND ((lineitem.l_quantity >= 10:Int32::Decimal) AND (lineitem.l_quantity <= 20:Int32::Decimal))) AND (part.p_size <= 15:Int32))) } └─LogicalJoin { type: Inner, on: true, output: all } ├─LogicalScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_linenumber, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus, lineitem.l_shipdate, lineitem.l_commitdate, lineitem.l_receiptdate, lineitem.l_shipinstruct, lineitem.l_shipmode, lineitem.l_comment] } └─LogicalScan { table: part, columns: [part.p_partkey, part.p_name, part.p_mfgr, part.p_brand, part.p_type, part.p_size, part.p_container, part.p_retailprice, part.p_comment] } optimized_logical_plan_for_batch: |- LogicalAgg { aggs: [sum($expr1)] } └─LogicalProject { exprs: [(lineitem.l_extendedprice * (1:Int32::Decimal - lineitem.l_discount)) as $expr1] } - └─LogicalJoin { type: Inner, on: (part.p_partkey = lineitem.l_partkey) AND (((((((part.p_brand = 'Brand#52':Varchar) AND In(part.p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND (lineitem.l_quantity >= 1:Int32::Decimal)) AND (lineitem.l_quantity <= 11:Int32::Decimal)) AND (part.p_size <= 5:Int32)) OR (((((part.p_brand = 'Brand#24':Varchar) AND In(part.p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND (lineitem.l_quantity >= 30:Int32::Decimal)) AND (lineitem.l_quantity <= 40:Int32::Decimal)) AND (part.p_size <= 10:Int32))) OR (((((part.p_brand = 'Brand#32':Varchar) AND In(part.p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND (lineitem.l_quantity >= 10:Int32::Decimal)) AND (lineitem.l_quantity <= 20:Int32::Decimal)) AND (part.p_size <= 15:Int32))), output: [lineitem.l_extendedprice, lineitem.l_discount] } + └─LogicalJoin { type: Inner, on: (part.p_partkey = lineitem.l_partkey) AND ((((((part.p_brand = 'Brand#52':Varchar) AND In(part.p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND ((lineitem.l_quantity >= 1:Int32::Decimal) AND (lineitem.l_quantity <= 11:Int32::Decimal))) AND (part.p_size <= 5:Int32)) OR ((((part.p_brand = 'Brand#24':Varchar) AND In(part.p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND ((lineitem.l_quantity >= 30:Int32::Decimal) AND (lineitem.l_quantity <= 40:Int32::Decimal))) AND (part.p_size <= 10:Int32))) OR ((((part.p_brand = 'Brand#32':Varchar) AND In(part.p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND ((lineitem.l_quantity >= 10:Int32::Decimal) AND (lineitem.l_quantity <= 20:Int32::Decimal))) AND (part.p_size <= 15:Int32))), output: [lineitem.l_extendedprice, lineitem.l_discount] } ├─LogicalScan { table: lineitem, output_columns: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount], required_columns: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipinstruct, lineitem.l_shipmode], predicate: In(lineitem.l_shipmode, 'AIR':Varchar, 'AIR REG':Varchar) AND (lineitem.l_shipinstruct = 'DELIVER IN PERSON':Varchar) } └─LogicalScan { table: part, columns: [part.p_partkey, part.p_brand, part.p_size, part.p_container], predicate: (part.p_size >= 1:Int32) } batch_plan: |- @@ -3665,7 +3665,7 @@ └─BatchExchange { order: [], dist: Single } └─BatchSimpleAgg { aggs: [sum($expr1)] } └─BatchProject { exprs: [(lineitem.l_extendedprice * (1:Decimal - lineitem.l_discount)) as $expr1] } - └─BatchLookupJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey AND (((((((part.p_brand = 'Brand#52':Varchar) AND In(part.p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND (lineitem.l_quantity >= 1:Decimal)) AND (lineitem.l_quantity <= 11:Decimal)) AND (part.p_size <= 5:Int32)) OR (((((part.p_brand = 'Brand#24':Varchar) AND In(part.p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND (lineitem.l_quantity >= 30:Decimal)) AND (lineitem.l_quantity <= 40:Decimal)) AND (part.p_size <= 10:Int32))) OR (((((part.p_brand = 'Brand#32':Varchar) AND In(part.p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND (lineitem.l_quantity >= 10:Decimal)) AND (lineitem.l_quantity <= 20:Decimal)) AND (part.p_size <= 15:Int32))) AND (part.p_size >= 1:Int32), output: [lineitem.l_extendedprice, lineitem.l_discount], lookup table: part } + └─BatchLookupJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey AND ((((((part.p_brand = 'Brand#52':Varchar) AND In(part.p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND ((lineitem.l_quantity >= 1:Decimal) AND (lineitem.l_quantity <= 11:Decimal))) AND (part.p_size <= 5:Int32)) OR ((((part.p_brand = 'Brand#24':Varchar) AND In(part.p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND ((lineitem.l_quantity >= 30:Decimal) AND (lineitem.l_quantity <= 40:Decimal))) AND (part.p_size <= 10:Int32))) OR ((((part.p_brand = 'Brand#32':Varchar) AND In(part.p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND ((lineitem.l_quantity >= 10:Decimal) AND (lineitem.l_quantity <= 20:Decimal))) AND (part.p_size <= 15:Int32))) AND (part.p_size >= 1:Int32), output: [lineitem.l_extendedprice, lineitem.l_discount], lookup table: part } └─BatchExchange { order: [], dist: UpstreamHashShard(lineitem.l_partkey) } └─BatchProject { exprs: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount] } └─BatchFilter { predicate: In(lineitem.l_shipmode, 'AIR':Varchar, 'AIR REG':Varchar) AND (lineitem.l_shipinstruct = 'DELIVER IN PERSON':Varchar) } @@ -3677,7 +3677,7 @@ └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [sum($expr1)] } └─StreamProject { exprs: [(lineitem.l_extendedprice * (1:Decimal - lineitem.l_discount)) as $expr1, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_partkey] } - └─StreamFilter { predicate: (((((((part.p_brand = 'Brand#52':Varchar) AND In(part.p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND (lineitem.l_quantity >= 1:Decimal)) AND (lineitem.l_quantity <= 11:Decimal)) AND (part.p_size <= 5:Int32)) OR (((((part.p_brand = 'Brand#24':Varchar) AND In(part.p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND (lineitem.l_quantity >= 30:Decimal)) AND (lineitem.l_quantity <= 40:Decimal)) AND (part.p_size <= 10:Int32))) OR (((((part.p_brand = 'Brand#32':Varchar) AND In(part.p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND (lineitem.l_quantity >= 10:Decimal)) AND (lineitem.l_quantity <= 20:Decimal)) AND (part.p_size <= 15:Int32))) } + └─StreamFilter { predicate: ((((((part.p_brand = 'Brand#52':Varchar) AND In(part.p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND ((lineitem.l_quantity >= 1:Decimal) AND (lineitem.l_quantity <= 11:Decimal))) AND (part.p_size <= 5:Int32)) OR ((((part.p_brand = 'Brand#24':Varchar) AND In(part.p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND ((lineitem.l_quantity >= 30:Decimal) AND (lineitem.l_quantity <= 40:Decimal))) AND (part.p_size <= 10:Int32))) OR ((((part.p_brand = 'Brand#32':Varchar) AND In(part.p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND ((lineitem.l_quantity >= 10:Decimal) AND (lineitem.l_quantity <= 20:Decimal))) AND (part.p_size <= 15:Int32))) } └─StreamHashJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey, output: all } ├─StreamExchange { dist: HashShard(lineitem.l_partkey) } │ └─StreamProject { exprs: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber] } @@ -3697,7 +3697,7 @@ Fragment 1 StreamStatelessSimpleAgg { aggs: [sum($expr1)] } └── StreamProject { exprs: [(lineitem.l_extendedprice * (1:Decimal - lineitem.l_discount)) as $expr1, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_partkey] } - └── StreamFilter { predicate: (((((((part.p_brand = 'Brand#52':Varchar) AND In(part.p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND (lineitem.l_quantity >= 1:Decimal)) AND (lineitem.l_quantity <= 11:Decimal)) AND (part.p_size <= 5:Int32)) OR (((((part.p_brand = 'Brand#24':Varchar) AND In(part.p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND (lineitem.l_quantity >= 30:Decimal)) AND (lineitem.l_quantity <= 40:Decimal)) AND (part.p_size <= 10:Int32))) OR (((((part.p_brand = 'Brand#32':Varchar) AND In(part.p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND (lineitem.l_quantity >= 10:Decimal)) AND (lineitem.l_quantity <= 20:Decimal)) AND (part.p_size <= 15:Int32))) } + └── StreamFilter { predicate: ((((((part.p_brand = 'Brand#52':Varchar) AND In(part.p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND ((lineitem.l_quantity >= 1:Decimal) AND (lineitem.l_quantity <= 11:Decimal))) AND (part.p_size <= 5:Int32)) OR ((((part.p_brand = 'Brand#24':Varchar) AND In(part.p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND ((lineitem.l_quantity >= 30:Decimal) AND (lineitem.l_quantity <= 40:Decimal))) AND (part.p_size <= 10:Int32))) OR ((((part.p_brand = 'Brand#32':Varchar) AND In(part.p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND ((lineitem.l_quantity >= 10:Decimal) AND (lineitem.l_quantity <= 20:Decimal))) AND (part.p_size <= 15:Int32))) } └── StreamHashJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey, output: all } { tables: [ HashJoinLeft: 1, HashJoinDegreeLeft: 2, HashJoinRight: 3, HashJoinDegreeRight: 4 ] } ├── StreamExchange Hash([0]) from 2 └── StreamExchange Hash([0]) from 3 diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index d14d99766bcc4..9dd5f7be1d53d 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -206,6 +206,20 @@ impl ExprImpl { .into() } + /// Create a new expression by merging the given expressions by `And`. + /// + /// If `exprs` is empty, return a literal `true`. + pub fn and(exprs: impl IntoIterator) -> Self { + merge_expr_by_logical(exprs, ExprType::And, ExprImpl::literal_bool(true)) + } + + /// Create a new expression by merging the given expressions by `Or`. + /// + /// If `exprs` is empty, return a literal `false`. + pub fn or(exprs: impl IntoIterator) -> Self { + merge_expr_by_logical(exprs, ExprType::Or, ExprImpl::literal_bool(false)) + } + /// Collect all `InputRef`s' indexes in the expression. /// /// # Panics @@ -1040,11 +1054,7 @@ impl ExprImpl { impl From for ExprImpl { fn from(c: Condition) -> Self { - merge_expr_by_binary( - c.conjunctions.into_iter(), - ExprType::And, - ExprImpl::literal_bool(true), - ) + ExprImpl::and(c.conjunctions) } } diff --git a/src/frontend/src/expr/utils.rs b/src/frontend/src/expr/utils.rs index 54d0521b3f8ef..8ab2ae0e16d7f 100644 --- a/src/frontend/src/expr/utils.rs +++ b/src/frontend/src/expr/utils.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::VecDeque; + use fixedbitset::FixedBitSet; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_pb::expr::expr_node::Type; @@ -31,19 +33,30 @@ fn split_expr_by(expr: ExprImpl, op: ExprType, rets: &mut Vec) { } } -pub fn merge_expr_by_binary(mut exprs: I, op: ExprType, identity_elem: ExprImpl) -> ExprImpl +/// Merge the given expressions by the logical operation. +/// +/// The `op` must be commutative and associative, typically `And` or `Or`. +pub(super) fn merge_expr_by_logical(exprs: I, op: ExprType, identity_elem: ExprImpl) -> ExprImpl where - I: Iterator, + I: IntoIterator, { - if let Some(e) = exprs.next() { - let mut ret = e; - for expr in exprs { - ret = FunctionCall::new(op, vec![ret, expr]).unwrap().into(); + let mut exprs: VecDeque<_> = exprs.into_iter().map(|e| (0usize, e)).collect(); + + while exprs.len() > 1 { + let (level, lhs) = exprs.pop_front().unwrap(); + let rhs_level = exprs.front().unwrap().0; + + // If there's one element left in the current level, move it to the end of the next level. + if level < rhs_level { + exprs.push_back((level, lhs)); + } else { + let rhs = exprs.pop_front().unwrap().1; + let new_expr = FunctionCall::new(op, vec![lhs, rhs]).unwrap().into(); + exprs.push_back((level + 1, new_expr)); } - ret - } else { - identity_elem } + + exprs.pop_front().map(|(_, e)| e).unwrap_or(identity_elem) } /// Transform a bool expression to Conjunctive form. e.g. given expression is @@ -393,17 +406,7 @@ pub fn factorization_expr(expr: ExprImpl) -> Vec { disjunction.retain(|factor| !greatest_common_divider.contains(factor)); } // now disjunctions == [[A, B], [B], [E]] - let remaining = merge_expr_by_binary( - disjunctions.into_iter().map(|conjunction| { - merge_expr_by_binary( - conjunction.into_iter(), - ExprType::And, - ExprImpl::literal_bool(true), - ) - }), - ExprType::Or, - ExprImpl::literal_bool(false), - ); + let remaining = ExprImpl::or(disjunctions.into_iter().map(ExprImpl::and)); // now remaining is (A & B) | (B) | (E) // the result is C & D & ((A & B) | (B) | (E)) greatest_common_divider diff --git a/src/frontend/src/optimizer/plan_node/logical_filter.rs b/src/frontend/src/optimizer/plan_node/logical_filter.rs index 4ea9adf7aacac..04cc2cb12a689 100644 --- a/src/frontend/src/optimizer/plan_node/logical_filter.rs +++ b/src/frontend/src/optimizer/plan_node/logical_filter.rs @@ -64,27 +64,17 @@ impl LogicalFilter { } } - /// Create a `LogicalFilter` to filter the rows with all keys are null. - pub fn filter_if_keys_all_null(input: PlanRef, key: &[usize]) -> PlanRef { + /// Create a `LogicalFilter` to filter out rows where all keys are null. + pub fn filter_out_all_null_keys(input: PlanRef, key: &[usize]) -> PlanRef { let schema = input.schema(); - let cond = key.iter().fold(ExprImpl::literal_bool(false), |expr, i| { - ExprImpl::FunctionCall( - FunctionCall::new_unchecked( - ExprType::Or, - vec![ - expr, - FunctionCall::new_unchecked( - ExprType::IsNotNull, - vec![InputRef::new(*i, schema.fields()[*i].data_type.clone()).into()], - DataType::Boolean, - ) - .into(), - ], - DataType::Boolean, - ) - .into(), + let cond = ExprImpl::or(key.iter().unique().map(|&i| { + FunctionCall::new_unchecked( + ExprType::IsNotNull, + vec![InputRef::new(i, schema.fields()[i].data_type.clone()).into()], + DataType::Boolean, ) - }); + .into() + })); LogicalFilter::create_with_expr(input, cond) } diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index e9dac0de38b5a..a8a832407ba68 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -1451,7 +1451,7 @@ impl ToStream for LogicalJoin { ) .collect_vec(); let plan: PlanRef = join_with_pk.into(); - LogicalFilter::filter_if_keys_all_null(plan, &left_right_stream_keys) + LogicalFilter::filter_out_all_null_keys(plan, &left_right_stream_keys) } else { join_with_pk.into() }; diff --git a/src/frontend/src/optimizer/rule/intersect_to_semi_join_rule.rs b/src/frontend/src/optimizer/rule/intersect_to_semi_join_rule.rs index 29ccbc066ec36..1d7d385aec627 100644 --- a/src/frontend/src/optimizer/rule/intersect_to_semi_join_rule.rs +++ b/src/frontend/src/optimizer/rule/intersect_to_semi_join_rule.rs @@ -50,14 +50,14 @@ impl Rule for IntersectToSemiJoinRule { impl IntersectToSemiJoinRule { pub(crate) fn gen_null_safe_equal(left: PlanRef, right: PlanRef) -> ExprImpl { - (left + let arms = (left .schema() .fields() .iter() .zip_eq_debug(right.schema().fields()) .enumerate()) - .fold(None, |expr, (i, (left_field, right_field))| { - let equal = ExprImpl::FunctionCall(Box::new(FunctionCall::new_unchecked( + .map(|(i, (left_field, right_field))| { + ExprImpl::FunctionCall(Box::new(FunctionCall::new_unchecked( ExprType::IsNotDistinctFrom, vec![ ExprImpl::InputRef(Box::new(InputRef::new(i, left_field.data_type()))), @@ -67,16 +67,9 @@ impl IntersectToSemiJoinRule { ))), ], Boolean, - ))); - - match expr { - None => Some(equal), - Some(expr) => Some(ExprImpl::FunctionCall(Box::new( - FunctionCall::new_unchecked(ExprType::And, vec![expr, equal], Boolean), - ))), - } - }) - .unwrap() + ))) + }); + ExprImpl::and(arms) } } diff --git a/src/frontend/src/optimizer/rule/stream/split_now_or_rule.rs b/src/frontend/src/optimizer/rule/stream/split_now_or_rule.rs index 36c88211848a1..ea63119980f90 100644 --- a/src/frontend/src/optimizer/rule/stream/split_now_or_rule.rs +++ b/src/frontend/src/optimizer/rule/stream/split_now_or_rule.rs @@ -57,7 +57,7 @@ impl Rule for SplitNowOrRule { return None; } - let (mut now, others): (Vec, Vec) = + let (now, others): (Vec, Vec) = disjunctions.into_iter().partition(|x| x.count_nows() != 0); // Only support now in one arm of disjunctions @@ -70,22 +70,10 @@ impl Rule for SplitNowOrRule { // + A & !B & !C ... &!Z // + B | C ... | Z - let mut arm1 = now.pop().unwrap(); - for pred in &others { - let not_pred: ExprImpl = - FunctionCall::new_unchecked(ExprType::Not, vec![pred.clone()], DataType::Boolean) - .into(); - arm1 = - FunctionCall::new_unchecked(ExprType::And, vec![arm1, not_pred], DataType::Boolean) - .into(); - } - - let arm2 = others - .into_iter() - .reduce(|a, b| { - FunctionCall::new_unchecked(ExprType::Or, vec![a, b], DataType::Boolean).into() - }) - .unwrap(); + let arm1 = ExprImpl::and(now.into_iter().chain(others.iter().map(|pred| { + FunctionCall::new_unchecked(ExprType::Not, vec![pred.clone()], DataType::Boolean).into() + }))); + let arm2 = ExprImpl::or(others); let share = LogicalShare::create(input); let filter1 = LogicalFilter::create_with_expr(share.clone(), arm1); diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 5fbfc0e19f6fd..b08af43583eed 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -50,7 +50,7 @@ risingwave_sqlsmith = { workspace = true } serde = "1.0.188" serde_derive = "1.0.188" serde_json = "1.0.107" -sqllogictest = "0.20" +sqllogictest = "0.20.5" tempfile = "3" tikv-jemallocator = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio" } diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index 89e05b974e275..e2fdfb6b54bb8 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -208,6 +208,7 @@ pub async fn run_slt_task( // use a session per file let mut tester = sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into())); + tester.add_label("madsim"); let file = file.unwrap(); let path = file.as_path(); @@ -461,6 +462,8 @@ pub async fn run_slt_task( pub async fn run_parallel_slt_task(glob: &str, jobs: usize) -> Result<(), ParallelTestError> { let mut tester = sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into())); + tester.add_label("madsim"); + tester .run_parallel_async( glob,