diff --git a/src/frontend/planner_test/tests/testdata/input/temporal_filter.yaml b/src/frontend/planner_test/tests/testdata/input/temporal_filter.yaml index 8df9d78869f04..6bd62c1ce4d61 100644 --- a/src/frontend/planner_test/tests/testdata/input/temporal_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/input/temporal_filter.yaml @@ -109,5 +109,11 @@ sql: | create table t1 (ts timestamp with time zone); select * from t1 where ts + interval '1 hour' > now() or ts > ' 2023-12-18 00:00:00+00:00' or ts is null; + expected_outputs: + - stream_plan +- name: Many Temporal filter with or predicate + sql: | + create table t (t timestamp with time zone, a int); + select * from t where (t > NOW() - INTERVAL '1 hour' OR t is NULL OR a < 1) AND (t < NOW() - INTERVAL '1 hour' OR a > 1); expected_outputs: - stream_plan \ No newline at end of file diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml index a8799aba8a4f4..1f5934ce76378 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml @@ -218,56 +218,43 @@ create table t1 (ts timestamp with time zone); select * from t1 where ts < now() - interval '1 hour' and ts >= now() - interval '2 hour'; stream_plan: |- - StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck } - └─StreamDynamicFilter { predicate: (t1.ts < $expr2), output: [t1.ts, t1._row_id], condition_always_relax: true } - ├─StreamDynamicFilter { predicate: (t1.ts >= $expr1), output_watermarks: [t1.ts], output: [t1.ts, t1._row_id], cleaned_by_watermark: true } + StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck, watermark_columns: [ts] } + └─StreamDynamicFilter { predicate: (t1.ts >= $expr2), output_watermarks: [t1.ts], output: [t1.ts, t1._row_id], cleaned_by_watermark: true } + ├─StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true } │ ├─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } │ └─StreamExchange { dist: Broadcast } - │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '02:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } + │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } │ └─StreamNow { output: [now] } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] } + └─StreamProject { exprs: [SubtractWithTimeZone(now, '02:00:00':Interval, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] } └─StreamNow { output: [now] } stream_dist_plan: |+ Fragment 0 - StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck } + StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck, watermark_columns: [ts] } ├── materialized table: 4294967294 - └── StreamDynamicFilter { predicate: (t1.ts < $expr2), output: [t1.ts, t1._row_id], condition_always_relax: true } + └── StreamDynamicFilter { predicate: (t1.ts >= $expr2), output_watermarks: [t1.ts], output: [t1.ts, t1._row_id], cleaned_by_watermark: true } ├── left table: 0 ├── right table: 1 - ├── StreamDynamicFilter { predicate: (t1.ts >= $expr1), output_watermarks: [t1.ts], output: [t1.ts, t1._row_id], cleaned_by_watermark: true } - │ ├── left table: 2 - │ ├── right table: 3 - │ ├── StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - │ │ ├── state table: 4 + ├── StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true } { left table: 2, right table: 3 } + │ ├── StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } { state table: 4 } │ │ ├── Upstream │ │ └── BatchPlanNode │ └── StreamExchange Broadcast from 1 └── StreamExchange Broadcast from 2 Fragment 1 - StreamProject { exprs: [SubtractWithTimeZone(now, '02:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } + StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } └── StreamNow { output: [now] } { state table: 5 } Fragment 2 - StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] } + StreamProject { exprs: [SubtractWithTimeZone(now, '02:00:00':Interval, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] } └── StreamNow { output: [now] } { state table: 6 } - Table 0 - ├── columns: [ t1_ts, t1__row_id ] - ├── primary key: [ $0 ASC, $1 ASC ] - ├── value indices: [ 0, 1 ] - ├── distribution key: [ 1 ] - └── read pk prefix len hint: 1 + Table 0 { columns: [ t1_ts, t1__row_id ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 1 ], read pk prefix len hint: 1 } Table 1 { columns: [ $expr2 ], primary key: [], value indices: [ 0 ], distribution key: [], read pk prefix len hint: 0 } - Table 2 - ├── columns: [ t1_ts, t1__row_id ] - ├── primary key: [ $0 ASC, $1 ASC ] - ├── value indices: [ 0, 1 ] - ├── distribution key: [ 1 ] - └── read pk prefix len hint: 1 + Table 2 { columns: [ t1_ts, t1__row_id ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 1 ], read pk prefix len hint: 1 } Table 3 { columns: [ $expr1 ], primary key: [], value indices: [ 0 ], distribution key: [], read pk prefix len hint: 0 } @@ -283,12 +270,7 @@ Table 6 { columns: [ now ], primary key: [], value indices: [ 0 ], distribution key: [], read pk prefix len hint: 0 } - Table 4294967294 - ├── columns: [ ts, t1._row_id ] - ├── primary key: [ $1 ASC ] - ├── value indices: [ 0, 1 ] - ├── distribution key: [ 1 ] - └── read pk prefix len hint: 1 + Table 4294967294 { columns: [ ts, t1._row_id ], primary key: [ $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 1 ], read pk prefix len hint: 1 } - name: Temporal filter in on clause for inner join's left side sql: | @@ -300,14 +282,14 @@ └─StreamExchange { dist: HashShard(t1.a, t1._row_id, t2._row_id) } └─StreamHashJoin { type: Inner, predicate: t1.a = t2.b, output: [t1.a, t1.ta, t2.b, t2.tb, t1._row_id, t2._row_id] } ├─StreamExchange { dist: HashShard(t1.a) } - │ └─StreamDynamicFilter { predicate: (t1.ta < $expr2), output: [t1.a, t1.ta, t1._row_id], condition_always_relax: true } - │ ├─StreamDynamicFilter { predicate: (t1.ta >= $expr1), output_watermarks: [t1.ta], output: [t1.a, t1.ta, t1._row_id], cleaned_by_watermark: true } + │ └─StreamDynamicFilter { predicate: (t1.ta >= $expr2), output_watermarks: [t1.ta], output: [t1.a, t1.ta, t1._row_id], cleaned_by_watermark: true } + │ ├─StreamDynamicFilter { predicate: (t1.ta < $expr1), output: [t1.a, t1.ta, t1._row_id], condition_always_relax: true } │ │ ├─StreamTableScan { table: t1, columns: [t1.a, t1.ta, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } │ │ └─StreamExchange { dist: Broadcast } - │ │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '02:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } + │ │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } │ │ └─StreamNow { output: [now] } │ └─StreamExchange { dist: Broadcast } - │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] } + │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '02:00:00':Interval, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] } │ └─StreamNow { output: [now] } └─StreamExchange { dist: HashShard(t2.b) } └─StreamTableScan { table: t2, columns: [t2.b, t2.tb, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } @@ -331,14 +313,14 @@ ├─StreamExchange { dist: HashShard(t2.b) } │ └─StreamTableScan { table: t2, columns: [t2.b, t2.tb, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } └─StreamExchange { dist: HashShard(t1.a) } - └─StreamDynamicFilter { predicate: (t1.ta < $expr2), output: [t1.a, t1.ta, t1._row_id], condition_always_relax: true } - ├─StreamDynamicFilter { predicate: (t1.ta >= $expr1), output_watermarks: [t1.ta], output: [t1.a, t1.ta, t1._row_id], cleaned_by_watermark: true } + └─StreamDynamicFilter { predicate: (t1.ta >= $expr2), output_watermarks: [t1.ta], output: [t1.a, t1.ta, t1._row_id], cleaned_by_watermark: true } + ├─StreamDynamicFilter { predicate: (t1.ta < $expr1), output: [t1.a, t1.ta, t1._row_id], condition_always_relax: true } │ ├─StreamTableScan { table: t1, columns: [t1.a, t1.ta, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } │ └─StreamExchange { dist: Broadcast } - │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '02:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } + │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } │ └─StreamNow { output: [now] } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] } + └─StreamProject { exprs: [SubtractWithTimeZone(now, '02:00:00':Interval, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] } └─StreamNow { output: [now] } - name: Temporal filter in on clause for full join's left side sql: | @@ -360,14 +342,14 @@ ├─StreamExchange { dist: HashShard(t1.a) } │ └─StreamTableScan { table: t1, columns: [t1.a, t1.ta, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: HashShard(t2.b) } - └─StreamDynamicFilter { predicate: (t2.tb < $expr2), output: [t2.b, t2.tb, t2._row_id], condition_always_relax: true } - ├─StreamDynamicFilter { predicate: (t2.tb >= $expr1), output_watermarks: [t2.tb], output: [t2.b, t2.tb, t2._row_id], cleaned_by_watermark: true } + └─StreamDynamicFilter { predicate: (t2.tb >= $expr2), output_watermarks: [t2.tb], output: [t2.b, t2.tb, t2._row_id], cleaned_by_watermark: true } + ├─StreamDynamicFilter { predicate: (t2.tb < $expr1), output: [t2.b, t2.tb, t2._row_id], condition_always_relax: true } │ ├─StreamTableScan { table: t2, columns: [t2.b, t2.tb, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } │ └─StreamExchange { dist: Broadcast } - │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '02:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } + │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } │ └─StreamNow { output: [now] } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] } + └─StreamProject { exprs: [SubtractWithTimeZone(now, '02:00:00':Interval, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] } └─StreamNow { output: [now] } - name: Temporal filter in on clause for right join's right side sql: | @@ -462,3 +444,56 @@ └─StreamShare { id: 2 } └─StreamFilter { predicate: (((Not((t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) AND Not(IsNull(t1.ts))) OR (t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) OR IsNull(t1.ts)) } └─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } +- name: Many Temporal filter with or predicate + sql: | + create table t (t timestamp with time zone, a int); + select * from t where (t > NOW() - INTERVAL '1 hour' OR t is NULL OR a < 1) AND (t < NOW() - INTERVAL '1 hour' OR a > 1); + stream_plan: |- + StreamMaterialize { columns: [t, a, $src(hidden), t._row_id(hidden), $src#1(hidden)], stream_key: [t._row_id, $src, $src#1], pk_columns: [t._row_id, $src, $src#1], pk_conflict: NoCheck } + └─StreamUnion { all: true } + ├─StreamExchange { dist: HashShard(t._row_id, $src, 0:Int32) } + │ └─StreamProject { exprs: [t.t, t.a, $src, t._row_id, 0:Int32] } + │ └─StreamDynamicFilter { predicate: (t.t < $expr2), output: [t.t, t.a, t._row_id, $src], condition_always_relax: true } + │ ├─StreamFilter { predicate: Not((t.a > 1:Int32)) } + │ │ └─StreamShare { id: 13 } + │ │ └─StreamUnion { all: true } + │ │ ├─StreamExchange { dist: HashShard(t._row_id, 0:Int32) } + │ │ │ └─StreamProject { exprs: [t.t, t.a, t._row_id, 0:Int32], output_watermarks: [t.t] } + │ │ │ └─StreamDynamicFilter { predicate: (t.t > $expr1), output_watermarks: [t.t], output: [t.t, t.a, t._row_id], cleaned_by_watermark: true } + │ │ │ ├─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND Not(IsNull(t.t)) AND Not((t.a < 1:Int32)) } + │ │ │ │ └─StreamShare { id: 2 } + │ │ │ │ └─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND (((Not(IsNull(t.t)) AND Not((t.a < 1:Int32))) OR IsNull(t.t)) OR (t.a < 1:Int32)) } + │ │ │ │ └─StreamTableScan { table: t, columns: [t.t, t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + │ │ │ └─StreamExchange { dist: Broadcast } + │ │ │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } + │ │ │ └─StreamNow { output: [now] } + │ │ └─StreamExchange { dist: HashShard(t._row_id, 1:Int32) } + │ │ └─StreamProject { exprs: [t.t, t.a, t._row_id, 1:Int32] } + │ │ └─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND (IsNull(t.t) OR (t.a < 1:Int32)) } + │ │ └─StreamShare { id: 2 } + │ │ └─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND (((Not(IsNull(t.t)) AND Not((t.a < 1:Int32))) OR IsNull(t.t)) OR (t.a < 1:Int32)) } + │ │ └─StreamTableScan { table: t, columns: [t.t, t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + │ └─StreamExchange { dist: Broadcast } + │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] } + │ └─StreamNow { output: [now] } + └─StreamExchange { dist: HashShard(t._row_id, $src, 1:Int32) } + └─StreamProject { exprs: [t.t, t.a, $src, t._row_id, 1:Int32] } + └─StreamFilter { predicate: (t.a > 1:Int32) } + └─StreamShare { id: 13 } + └─StreamUnion { all: true } + ├─StreamExchange { dist: HashShard(t._row_id, 0:Int32) } + │ └─StreamProject { exprs: [t.t, t.a, t._row_id, 0:Int32], output_watermarks: [t.t] } + │ └─StreamDynamicFilter { predicate: (t.t > $expr1), output_watermarks: [t.t], output: [t.t, t.a, t._row_id], cleaned_by_watermark: true } + │ ├─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND Not(IsNull(t.t)) AND Not((t.a < 1:Int32)) } + │ │ └─StreamShare { id: 2 } + │ │ └─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND (((Not(IsNull(t.t)) AND Not((t.a < 1:Int32))) OR IsNull(t.t)) OR (t.a < 1:Int32)) } + │ │ └─StreamTableScan { table: t, columns: [t.t, t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + │ └─StreamExchange { dist: Broadcast } + │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } + │ └─StreamNow { output: [now] } + └─StreamExchange { dist: HashShard(t._row_id, 1:Int32) } + └─StreamProject { exprs: [t.t, t.a, t._row_id, 1:Int32] } + └─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND (IsNull(t.t) OR (t.a < 1:Int32)) } + └─StreamShare { id: 2 } + └─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND (((Not(IsNull(t.t)) AND Not((t.a < 1:Int32))) OR IsNull(t.t)) OR (t.a < 1:Int32)) } + └─StreamTableScan { table: t, columns: [t.t, t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index b513d4904669c..db5dc8ceca7d2 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -240,7 +240,11 @@ static BUSHY_TREE_JOIN_ORDERING: LazyLock = LazyLock::new(|| static FILTER_WITH_NOW_TO_JOIN: LazyLock = LazyLock::new(|| { OptimizationStage::new( "Push down filter with now into a left semijoin", - vec![SplitNowOrRule::create(), FilterWithNowToJoinRule::create()], + vec![ + SplitNowAndRule::create(), + SplitNowOrRule::create(), + FilterWithNowToJoinRule::create(), + ], ApplyOrder::TopDown, ) }); diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index d59bde580b1d1..acde2f7b72eb6 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -90,6 +90,7 @@ pub use top_n_on_index_rule::*; mod stream; pub use stream::bushy_tree_join_ordering_rule::*; pub use stream::filter_with_now_to_join_rule::*; +pub use stream::split_now_and_rule::*; pub use stream::split_now_or_rule::*; pub use stream::stream_project_merge_rule::*; mod trivial_project_to_values_rule; @@ -190,6 +191,7 @@ macro_rules! for_all_rules { , { AggProjectMergeRule } , { UnionMergeRule } , { DagToTreeRule } + , { SplitNowAndRule } , { SplitNowOrRule } , { FilterWithNowToJoinRule } , { TopNOnIndexRule } diff --git a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs index b5f1d46d51743..498696589c81b 100644 --- a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs +++ b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs @@ -13,7 +13,6 @@ // limitations under the License. use risingwave_common::types::DataType; -use risingwave_pb::expr::expr_node::Type; use risingwave_pb::plan_common::JoinType; use crate::expr::{ @@ -55,11 +54,6 @@ impl Rule for FilterWithNowToJoinRule { } }); - // We want to put `input_expr >/>= now_expr` before `input_expr u8 { - match cmp { - Type::GreaterThan | Type::GreaterThanOrEqual => 0, - Type::LessThan | Type::LessThanOrEqual => 1, - _ => 2, - } -} - struct NowAsInputRef { index: usize, } diff --git a/src/frontend/src/optimizer/rule/stream/mod.rs b/src/frontend/src/optimizer/rule/stream/mod.rs index 3b088d93d64ec..cc86298e766e8 100644 --- a/src/frontend/src/optimizer/rule/stream/mod.rs +++ b/src/frontend/src/optimizer/rule/stream/mod.rs @@ -14,5 +14,6 @@ pub(crate) mod bushy_tree_join_ordering_rule; pub(crate) mod filter_with_now_to_join_rule; +pub(crate) mod split_now_and_rule; pub(crate) mod split_now_or_rule; pub(crate) mod stream_project_merge_rule; diff --git a/src/frontend/src/optimizer/rule/stream/split_now_and_rule.rs b/src/frontend/src/optimizer/rule/stream/split_now_and_rule.rs new file mode 100644 index 0000000000000..f82e4a8fdd304 --- /dev/null +++ b/src/frontend/src/optimizer/rule/stream/split_now_and_rule.rs @@ -0,0 +1,84 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::optimizer::plan_node::{LogicalFilter, PlanTreeNodeUnary}; +use crate::optimizer::rule::{BoxedRule, Rule}; +use crate::optimizer::PlanRef; +use crate::utils::Condition; + +/// Split `LogicalFilter` with many AND conjunctions with now into multiple `LogicalFilter`, prepared for `SplitNowOrRule` +/// +/// Before: +/// ```text +/// `LogicalFilter` +/// (now() or c11 or c12 ..) and (now() or c21 or c22 ...) and .. and other exprs +/// | +/// Input +/// ``` +/// +/// After: +/// ```text +/// `LogicalFilter`(now() or c11 or c12 ..) +/// | +/// `LogicalFilter`(now() or c21 or c22 ...) +/// | +/// ...... +/// | +/// `LogicalFilter` other exprs +/// | +/// Input +/// ``` +pub struct SplitNowAndRule {} +impl Rule for SplitNowAndRule { + fn apply(&self, plan: PlanRef) -> Option { + let filter: &LogicalFilter = plan.as_logical_filter()?; + let input = filter.input(); + if filter.predicate().conjunctions.len() == 1 { + return None; + } + + if filter + .predicate() + .conjunctions + .iter() + .all(|e| e.count_nows() == 0) + { + return None; + } + + let [with_now, others] = + filter + .predicate() + .clone() + .group_by::<_, 2>(|e| if e.count_nows() > 0 { 0 } else { 1 }); + + let mut plan = LogicalFilter::create(input, others); + for e in with_now { + plan = LogicalFilter::new( + plan, + Condition { + conjunctions: vec![e], + }, + ) + .into(); + } + Some(plan) + } +} + +impl SplitNowAndRule { + pub fn create() -> BoxedRule { + Box::new(SplitNowAndRule {}) + } +}