diff --git a/e2e_test/over_window/generated/batch/main.slt.part b/e2e_test/over_window/generated/batch/main.slt.part index 9f0ad1baeffe..9ef787964faf 100644 --- a/e2e_test/over_window/generated/batch/main.slt.part +++ b/e2e_test/over_window/generated/batch/main.slt.part @@ -8,3 +8,4 @@ include ./rank_func/mod.slt.part include ./expr_in_win_func/mod.slt.part include ./agg_in_win_func/mod.slt.part include ./opt_agg_then_join/mod.slt.part +include ./with_filter/mod.slt.part diff --git a/e2e_test/over_window/generated/batch/opt_agg_then_join.slt.part/mod.slt.part b/e2e_test/over_window/generated/batch/opt_agg_then_join/mod.slt.part similarity index 100% rename from e2e_test/over_window/generated/batch/opt_agg_then_join.slt.part/mod.slt.part rename to e2e_test/over_window/generated/batch/opt_agg_then_join/mod.slt.part diff --git a/e2e_test/over_window/generated/batch/with_filter/mod.slt.part b/e2e_test/over_window/generated/batch/with_filter/mod.slt.part new file mode 100644 index 000000000000..93c253e821c4 --- /dev/null +++ b/e2e_test/over_window/generated/batch/with_filter/mod.slt.part @@ -0,0 +1,51 @@ +# This file is generated by `gen.py`. Do not edit it manually! + +# Test window functions together with filters. +# https://github.com/risingwavelabs/risingwave/issues/13653 + +statement ok +create table t (id int, cat varchar, rule varchar, at timestamptz); + +statement ok +insert into t values + (1, 'foo', 'A', '2023-11-23T12:00:42Z') +, (2, 'foo', 'B', '2023-11-23T12:01:15Z'); + +statement ok +create view v1 as +select rule, lag(rule) over (partition by cat order by at) from t where rule = 'B'; + +statement ok +create view v2 as +select * from (select rule, lag(rule) over (partition by cat order by at) as prev_rule from t) where rule = 'B'; + +statement ok +create view v3 as +select * from (select rule, at, row_number() over (partition by cat order by at) as rank from t) where at = '2023-11-23T12:01:15Z'::timestamptz; + +query TT +select * from v1; +---- +B NULL + +query TT +select * from v2; +---- +B A + +query TT +select * from v3; +---- +B 2023-11-23 12:01:15+00:00 2 + +statement ok +drop view v1; + +statement ok +drop view v2; + +statement ok +drop view v3; + +statement ok +drop table t; diff --git a/e2e_test/over_window/generated/streaming/main.slt.part b/e2e_test/over_window/generated/streaming/main.slt.part index 9f0ad1baeffe..9ef787964faf 100644 --- a/e2e_test/over_window/generated/streaming/main.slt.part +++ b/e2e_test/over_window/generated/streaming/main.slt.part @@ -8,3 +8,4 @@ include ./rank_func/mod.slt.part include ./expr_in_win_func/mod.slt.part include ./agg_in_win_func/mod.slt.part include ./opt_agg_then_join/mod.slt.part +include ./with_filter/mod.slt.part diff --git a/e2e_test/over_window/generated/streaming/opt_agg_then_join.slt.part/mod.slt.part b/e2e_test/over_window/generated/streaming/opt_agg_then_join/mod.slt.part similarity index 100% rename from e2e_test/over_window/generated/streaming/opt_agg_then_join.slt.part/mod.slt.part rename to e2e_test/over_window/generated/streaming/opt_agg_then_join/mod.slt.part diff --git a/e2e_test/over_window/generated/streaming/with_filter/mod.slt.part b/e2e_test/over_window/generated/streaming/with_filter/mod.slt.part new file mode 100644 index 000000000000..03a9f9fffae6 --- /dev/null +++ b/e2e_test/over_window/generated/streaming/with_filter/mod.slt.part @@ -0,0 +1,51 @@ +# This file is generated by `gen.py`. Do not edit it manually! + +# Test window functions together with filters. +# https://github.com/risingwavelabs/risingwave/issues/13653 + +statement ok +create table t (id int, cat varchar, rule varchar, at timestamptz); + +statement ok +insert into t values + (1, 'foo', 'A', '2023-11-23T12:00:42Z') +, (2, 'foo', 'B', '2023-11-23T12:01:15Z'); + +statement ok +create materialized view v1 as +select rule, lag(rule) over (partition by cat order by at) from t where rule = 'B'; + +statement ok +create materialized view v2 as +select * from (select rule, lag(rule) over (partition by cat order by at) as prev_rule from t) where rule = 'B'; + +statement ok +create materialized view v3 as +select * from (select rule, at, row_number() over (partition by cat order by at) as rank from t) where at = '2023-11-23T12:01:15Z'::timestamptz; + +query TT +select * from v1; +---- +B NULL + +query TT +select * from v2; +---- +B A + +query TT +select * from v3; +---- +B 2023-11-23 12:01:15+00:00 2 + +statement ok +drop materialized view v1; + +statement ok +drop materialized view v2; + +statement ok +drop materialized view v3; + +statement ok +drop table t; diff --git a/e2e_test/over_window/templates/main.slt.part b/e2e_test/over_window/templates/main.slt.part index 00dfac5101ee..5cd945b123ea 100644 --- a/e2e_test/over_window/templates/main.slt.part +++ b/e2e_test/over_window/templates/main.slt.part @@ -6,3 +6,4 @@ include ./rank_func/mod.slt.part include ./expr_in_win_func/mod.slt.part include ./agg_in_win_func/mod.slt.part include ./opt_agg_then_join/mod.slt.part +include ./with_filter/mod.slt.part diff --git a/e2e_test/over_window/templates/opt_agg_then_join.slt.part/mod.slt.part b/e2e_test/over_window/templates/opt_agg_then_join/mod.slt.part similarity index 100% rename from e2e_test/over_window/templates/opt_agg_then_join.slt.part/mod.slt.part rename to e2e_test/over_window/templates/opt_agg_then_join/mod.slt.part diff --git a/e2e_test/over_window/templates/with_filter/mod.slt.part b/e2e_test/over_window/templates/with_filter/mod.slt.part new file mode 100644 index 000000000000..0ed38ab34458 --- /dev/null +++ b/e2e_test/over_window/templates/with_filter/mod.slt.part @@ -0,0 +1,49 @@ +# Test window functions together with filters. +# https://github.com/risingwavelabs/risingwave/issues/13653 + +statement ok +create table t (id int, cat varchar, rule varchar, at timestamptz); + +statement ok +insert into t values + (1, 'foo', 'A', '2023-11-23T12:00:42Z') +, (2, 'foo', 'B', '2023-11-23T12:01:15Z'); + +statement ok +create $view_type v1 as +select rule, lag(rule) over (partition by cat order by at) from t where rule = 'B'; + +statement ok +create $view_type v2 as +select * from (select rule, lag(rule) over (partition by cat order by at) as prev_rule from t) where rule = 'B'; + +statement ok +create $view_type v3 as +select * from (select rule, at, row_number() over (partition by cat order by at) as rank from t) where at = '2023-11-23T12:01:15Z'::timestamptz; + +query TT +select * from v1; +---- +B NULL + +query TT +select * from v2; +---- +B A + +query TT +select * from v3; +---- +B 2023-11-23 12:01:15+00:00 2 + +statement ok +drop $view_type v1; + +statement ok +drop $view_type v2; + +statement ok +drop $view_type v3; + +statement ok +drop table t; diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 2f3c14d34a08..8aa465b6e29c 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -31,7 +31,6 @@ #![feature(if_let_guard)] #![feature(iterator_try_collect)] #![feature(try_blocks)] -#![feature(let_else)] use std::time::Duration; diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index ad7fa10ea61b..3adf915a6241 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -326,7 +326,7 @@ mod tests { }, scan_startup_mode: None, - timestamp_offset: Some(123456789098765432), + timestamp_offset: Some("123456789098765432".to_string()), }; let client = KinesisSplitReader::new( properties, diff --git a/src/frontend/planner_test/tests/testdata/input/over_window_function.yaml b/src/frontend/planner_test/tests/testdata/input/over_window_function.yaml index 33c00048abc6..999c96f5379e 100644 --- a/src/frontend/planner_test/tests/testdata/input/over_window_function.yaml +++ b/src/frontend/planner_test/tests/testdata/input/over_window_function.yaml @@ -474,3 +474,19 @@ - stream_plan - optimized_logical_plan_for_batch - batch_plan + +# With filter +- sql: | + create table t (id int, cat varchar, rule varchar, at timestamptz); + select * from (select cat, rule, at, lag(rule) over (partition by cat order by at) as prev_rule from t) as with_prev + where rule = 'B' and cat is not null and at = '2023-11-23T12:00:42Z'::timestamptz; + expected_outputs: + - optimized_logical_plan_for_stream + - optimized_logical_plan_for_batch +- sql: | + create table t (id int, cat varchar, rule varchar, at timestamptz); + select cat, rule, at, lag(rule) over (partition by cat order by at) as prev_rule from t + where rule = 'B' and cat is not null and at = '2023-11-23T12:00:42Z'::timestamptz; + expected_outputs: + - optimized_logical_plan_for_stream + - optimized_logical_plan_for_batch diff --git a/src/frontend/planner_test/tests/testdata/output/column_pruning.yaml b/src/frontend/planner_test/tests/testdata/output/column_pruning.yaml index fc9a53ba2351..47c343d33d69 100644 --- a/src/frontend/planner_test/tests/testdata/output/column_pruning.yaml +++ b/src/frontend/planner_test/tests/testdata/output/column_pruning.yaml @@ -151,8 +151,10 @@ optimized_logical_plan_for_batch: |- LogicalProject { exprs: [row_number] } └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─LogicalTopN { order: [t.y ASC], limit: 3, offset: 0, group_key: [t.x] } - └─LogicalScan { table: t, output_columns: [t.x, t.y], required_columns: [t.x, t.y, t.z], predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) } + └─LogicalProject { exprs: [t.x, t.y] } + └─LogicalFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) } + └─LogicalTopN { order: [t.y ASC], limit: 3, offset: 0, group_key: [t.x] } + └─LogicalScan { table: t, columns: [t.x, t.y, t.z], predicate: (t.x > 0:Int32) } - name: mixed sql: | create table t (v1 bigint, v2 double precision, v3 int); diff --git a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml index c54f2a9458e4..6028c0c9dfed 100644 --- a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml +++ b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml @@ -311,24 +311,24 @@ BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [t.x, t.y, t.z, Sqrt(((sum::Decimal - (($expr4 * $expr4) / $expr5)) / $expr5)) as $expr6, Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) as $expr7] } └─BatchProject { exprs: [t.x, t.y, t.z, $expr2, $expr1, $expr3, sum, sum, count, sum, sum, count, sum::Decimal as $expr4, count::Decimal as $expr5] } - └─BatchFilter { predicate: (Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / count::Decimal)) <= 3.0:Decimal) AND (Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) > 1.0:Decimal) } + └─BatchFilter { predicate: (t.y > 0:Int32) AND (t.x > 0:Int32) AND (Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / count::Decimal)) <= 3.0:Decimal) AND (Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) > 1.0:Decimal) } └─BatchOverWindow { window_functions: [sum($expr2) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr3) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─BatchExchange { order: [t.z ASC, t.x ASC], dist: HashShard(t.z) } └─BatchSort { order: [t.z ASC, t.x ASC] } └─BatchProject { exprs: [t.x, t.y, t.z, ($expr1 * $expr1) as $expr2, $expr1, (t.x * t.x) as $expr3] } └─BatchProject { exprs: [t.x, t.y, t.z, (t.x - t.y) as $expr1] } - └─BatchFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) } + └─BatchFilter { predicate: (t.z > 0:Int32) } └─BatchScan { table: t, columns: [t.x, t.y, t.z], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [x, y, z, res0, res1, t._row_id(hidden)], stream_key: [t._row_id, z], pk_columns: [t._row_id, z], pk_conflict: NoCheck } └─StreamProject { exprs: [t.x, t.y, t.z, Sqrt(((sum::Decimal - (($expr4 * $expr4) / $expr5)) / $expr5)) as $expr6, Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) as $expr7, t._row_id] } └─StreamProject { exprs: [t.x, t.y, t.z, $expr2, $expr1, $expr3, sum, sum, count, sum, sum, count, sum::Decimal as $expr4, count::Decimal as $expr5, t._row_id] } - └─StreamFilter { predicate: (Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / count::Decimal)) <= 3.0:Decimal) AND (Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) > 1.0:Decimal) } + └─StreamFilter { predicate: (t.y > 0:Int32) AND (t.x > 0:Int32) AND (Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / count::Decimal)) <= 3.0:Decimal) AND (Case((count <= 1:Int64), null:Decimal, Sqrt(((sum::Decimal - ((sum::Decimal * sum::Decimal) / count::Decimal)) / (count - 1:Int64)::Decimal))) > 1.0:Decimal) } └─StreamOverWindow { window_functions: [sum($expr2) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr1) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum($expr3) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), sum(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count(t.x) OVER(PARTITION BY t.z ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─StreamExchange { dist: HashShard(t.z) } └─StreamProject { exprs: [t.x, t.y, t.z, ($expr1 * $expr1) as $expr2, $expr1, (t.x * t.x) as $expr3, t._row_id] } └─StreamProject { exprs: [t.x, t.y, t.z, (t.x - t.y) as $expr1, t._row_id] } - └─StreamFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) } + └─StreamFilter { predicate: (t.z > 0:Int32) } └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - id: aggregate with expression in func arguments and over clause sql: | @@ -346,29 +346,27 @@ └─LogicalScan { table: t, columns: [t.x, t.y, t.z, t.w, t._row_id] } optimized_logical_plan_for_batch: |- LogicalProject { exprs: [t.x, t.y, t.z, (sum::Decimal / count::Decimal) as $expr4] } - └─LogicalFilter { predicate: ((sum::Decimal / count::Decimal) <= 3.0:Decimal) } + └─LogicalFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) AND ((sum::Decimal / count::Decimal) <= 3.0:Decimal) } └─LogicalOverWindow { window_functions: [sum($expr1) OVER(PARTITION BY $expr2 ORDER BY $expr3 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr1) OVER(PARTITION BY $expr2 ORDER BY $expr3 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─LogicalProject { exprs: [t.x, t.y, t.z, (t.z * t.z) as $expr1, (t.y + 1:Int32) as $expr2, Abs(t.w) as $expr3] } - └─LogicalScan { table: t, columns: [t.x, t.y, t.z, t.w], predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) } + └─LogicalScan { table: t, columns: [t.x, t.y, t.z, t.w] } batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [t.x, t.y, t.z, (sum::Decimal / count::Decimal) as $expr4] } - └─BatchFilter { predicate: ((sum::Decimal / count::Decimal) <= 3.0:Decimal) } + └─BatchFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) AND ((sum::Decimal / count::Decimal) <= 3.0:Decimal) } └─BatchOverWindow { window_functions: [sum($expr1) OVER(PARTITION BY $expr2 ORDER BY $expr3 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr1) OVER(PARTITION BY $expr2 ORDER BY $expr3 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─BatchExchange { order: [$expr2 ASC, $expr3 ASC], dist: HashShard($expr2) } └─BatchSort { order: [$expr2 ASC, $expr3 ASC] } └─BatchProject { exprs: [t.x, t.y, t.z, (t.z * t.z) as $expr1, (t.y + 1:Int32) as $expr2, Abs(t.w) as $expr3] } - └─BatchFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) } - └─BatchScan { table: t, columns: [t.x, t.y, t.z, t.w], distribution: SomeShard } + └─BatchScan { table: t, columns: [t.x, t.y, t.z, t.w], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [x, y, z, res, t._row_id(hidden), $expr2(hidden)], stream_key: [t._row_id, $expr2], pk_columns: [t._row_id, $expr2], pk_conflict: NoCheck } └─StreamProject { exprs: [t.x, t.y, t.z, (sum::Decimal / count::Decimal) as $expr4, t._row_id, $expr2] } - └─StreamFilter { predicate: ((sum::Decimal / count::Decimal) <= 3.0:Decimal) } + └─StreamFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) AND ((sum::Decimal / count::Decimal) <= 3.0:Decimal) } └─StreamOverWindow { window_functions: [sum($expr1) OVER(PARTITION BY $expr2 ORDER BY $expr3 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), count($expr1) OVER(PARTITION BY $expr2 ORDER BY $expr3 ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─StreamExchange { dist: HashShard($expr2) } └─StreamProject { exprs: [t.x, t.y, t.z, (t.z * t.z) as $expr1, (t.y + 1:Int32) as $expr2, Abs(t.w) as $expr3, t._row_id] } - └─StreamFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) } - └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t.w, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t.w, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - id: row_number with empty over clause sql: | create table t(x int); @@ -494,19 +492,20 @@ └─LogicalProject { exprs: [t.x, t.y, t._row_id] } └─LogicalScan { table: t, columns: [t.x, t.y, t._row_id] } optimized_logical_plan_for_batch: |- - LogicalTopN { order: [t.x ASC], limit: 2, offset: 0, group_key: [t.y] } - └─LogicalScan { table: t, columns: [t.x, t.y], predicate: (t.x > t.y) } + LogicalFilter { predicate: (t.x > t.y) } + └─LogicalTopN { order: [t.x ASC], limit: 2, offset: 0, group_key: [t.y] } + └─LogicalScan { table: t, columns: [t.x, t.y] } batch_plan: |- BatchExchange { order: [], dist: Single } - └─BatchGroupTopN { order: [t.x ASC], limit: 2, offset: 0, group_key: [t.y] } - └─BatchExchange { order: [], dist: HashShard(t.y) } - └─BatchFilter { predicate: (t.x > t.y) } + └─BatchFilter { predicate: (t.x > t.y) } + └─BatchGroupTopN { order: [t.x ASC], limit: 2, offset: 0, group_key: [t.y] } + └─BatchExchange { order: [], dist: HashShard(t.y) } └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [y, t._row_id], pk_columns: [y, t._row_id], pk_conflict: NoCheck } - └─StreamGroupTopN { order: [t.x ASC], limit: 2, offset: 0, group_key: [t.y] } - └─StreamExchange { dist: HashShard(t.y) } - └─StreamFilter { predicate: (t.x > t.y) } + └─StreamFilter { predicate: (t.x > t.y) } + └─StreamGroupTopN { order: [t.x ASC], limit: 2, offset: 0, group_key: [t.y] } + └─StreamExchange { dist: HashShard(t.y) } └─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - id: TopN by rank without rank output sql: | @@ -775,30 +774,31 @@ └─LogicalScan { table: t, columns: [t.x, t.y, t.z, t._row_id] } optimized_logical_plan_for_stream: |- LogicalProject { exprs: [row_number] } - └─LogicalTopN { order: [t.z ASC], limit: 9, offset: 0, with_ties: true, group_key: [t.x, t.y] } - └─LogicalTopN { order: [t.z ASC], limit: 9, offset: 0, group_key: [t.x, t.y] } - └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─LogicalTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] } + └─LogicalFilter { predicate: (row_number < 10:Int32) AND (row_number < 10:Int32) } + └─LogicalTopN { order: [t.z ASC], limit: 9, offset: 0, with_ties: true, group_key: [t.x, t.y] } + └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─LogicalScan { table: t, columns: [t.x, t.y, t.z] } batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [row_number] } - └─BatchGroupTopN { order: [t.z ASC], limit: 9, offset: 0, with_ties: true, group_key: [t.x, t.y] } - └─BatchGroupTopN { order: [t.z ASC], limit: 9, offset: 0, group_key: [t.x, t.y] } + └─BatchFilter { predicate: (row_number < 10:Int32) AND (row_number < 10:Int32) } + └─BatchGroupTopN { order: [t.z ASC], limit: 9, offset: 0, with_ties: true, group_key: [t.x, t.y] } └─BatchExchange { order: [], dist: HashShard(t.x, t.y) } - └─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─BatchSort { order: [t.x ASC, t.y ASC] } - └─BatchGroupTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] } - └─BatchExchange { order: [], dist: HashShard(t.x) } - └─BatchScan { table: t, columns: [t.x, t.y, t.z], distribution: SomeShard } + └─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─BatchSort { order: [t.x ASC, t.y ASC, t.z ASC] } + └─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─BatchExchange { order: [t.x ASC, t.y ASC], dist: HashShard(t.x) } + └─BatchSort { order: [t.x ASC, t.y ASC] } + └─BatchScan { table: t, columns: [t.x, t.y, t.z], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [r1, t.x(hidden), t.y(hidden), t._row_id(hidden)], stream_key: [t.x, t.y, t._row_id], pk_columns: [t.x, t.y, t._row_id], pk_conflict: NoCheck } └─StreamProject { exprs: [row_number, t.x, t.y, t._row_id] } - └─StreamGroupTopN { order: [t.z ASC], limit: 9, offset: 0, with_ties: true, group_key: [t.x, t.y] } - └─StreamGroupTopN { order: [t.z ASC], limit: 9, offset: 0, group_key: [t.x, t.y] } + └─StreamFilter { predicate: (row_number < 10:Int32) AND (row_number < 10:Int32) } + └─StreamGroupTopN { order: [t.z ASC], limit: 9, offset: 0, with_ties: true, group_key: [t.x, t.y] } └─StreamExchange { dist: HashShard(t.x, t.y) } - └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─StreamGroupTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] } + └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─StreamExchange { dist: HashShard(t.x) } └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - id: create_bid @@ -989,3 +989,25 @@ └─StreamOverWindow { window_functions: [rank() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } └─StreamExchange { dist: HashShard(t.x) } └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + create table t (id int, cat varchar, rule varchar, at timestamptz); + select * from (select cat, rule, at, lag(rule) over (partition by cat order by at) as prev_rule from t) as with_prev + where rule = 'B' and cat is not null and at = '2023-11-23T12:00:42Z'::timestamptz; + optimized_logical_plan_for_batch: |- + LogicalFilter { predicate: (t.rule = 'B':Varchar) AND (t.at = '2023-11-23 12:00:42+00:00':Timestamptz) } + └─LogicalOverWindow { window_functions: [first_value(t.rule) OVER(PARTITION BY t.cat ORDER BY t.at ASC ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING)] } + └─LogicalScan { table: t, columns: [t.cat, t.rule, t.at], predicate: IsNotNull(t.cat) } + optimized_logical_plan_for_stream: |- + LogicalFilter { predicate: (t.rule = 'B':Varchar) AND (t.at = '2023-11-23 12:00:42+00:00':Timestamptz) } + └─LogicalOverWindow { window_functions: [first_value(t.rule) OVER(PARTITION BY t.cat ORDER BY t.at ASC ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING)] } + └─LogicalScan { table: t, columns: [t.cat, t.rule, t.at], predicate: IsNotNull(t.cat) } +- sql: | + create table t (id int, cat varchar, rule varchar, at timestamptz); + select cat, rule, at, lag(rule) over (partition by cat order by at) as prev_rule from t + where rule = 'B' and cat is not null and at = '2023-11-23T12:00:42Z'::timestamptz; + optimized_logical_plan_for_batch: |- + LogicalOverWindow { window_functions: [first_value(t.rule) OVER(PARTITION BY t.cat ORDER BY t.at ASC ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING)] } + └─LogicalScan { table: t, columns: [t.cat, t.rule, t.at], predicate: (t.rule = 'B':Varchar) AND IsNotNull(t.cat) AND (t.at = '2023-11-23 12:00:42+00:00':Timestamptz) } + optimized_logical_plan_for_stream: |- + LogicalOverWindow { window_functions: [first_value(t.rule) OVER(PARTITION BY t.cat ORDER BY t.at ASC ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING)] } + └─LogicalScan { table: t, columns: [t.cat, t.rule, t.at], predicate: (t.rule = 'B':Varchar) AND IsNotNull(t.cat) AND (t.at = '2023-11-23 12:00:42+00:00':Timestamptz) } diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index b9e58f9c9d6e..709e81b06107 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -23,9 +23,9 @@ use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncKind}; use super::generic::{GenericPlanRef, OverWindow, PlanWindowFunction, ProjectBuilder}; use super::utils::impl_distill_by_unit; use super::{ - gen_filter_and_pushdown, BatchOverWindow, ColPrunable, ExprRewritable, Logical, LogicalProject, - PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamEowcOverWindow, StreamEowcSort, - StreamOverWindow, ToBatch, ToStream, + gen_filter_and_pushdown, BatchOverWindow, ColPrunable, ExprRewritable, Logical, LogicalFilter, + LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamEowcOverWindow, + StreamEowcSort, StreamOverWindow, ToBatch, ToStream, }; use crate::expr::{ Expr, ExprImpl, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InputRef, WindowFunction, @@ -691,10 +691,19 @@ impl PredicatePushdown for LogicalOverWindow { predicate: Condition, ctx: &mut PredicatePushdownContext, ) -> PlanRef { - let mut window_col = FixedBitSet::with_capacity(self.schema().len()); - window_col.insert_range(self.core.input.schema().len()..self.schema().len()); - let (window_pred, other_pred) = predicate.split_disjoint(&window_col); - gen_filter_and_pushdown(self, window_pred, other_pred, ctx) + if !self.core.funcs_have_same_partition_and_order() { + // Window function calls with different PARTITION BY and ORDER BY clauses are not split yet. + return LogicalFilter::create(self.clone().into(), predicate); + } + + let all_out_cols: FixedBitSet = (0..self.schema().len()).collect(); + let mut remain_cols: FixedBitSet = all_out_cols + .difference(&self.core.partition_key_indices().into_iter().collect()) + .collect(); + remain_cols.grow(self.schema().len()); + + let (remain_pred, pushed_pred) = predicate.split_disjoint(&remain_cols); + gen_filter_and_pushdown(self, remain_pred, pushed_pred, ctx) } } diff --git a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs index 8da05e9fc758..1a889e8ffaf6 100644 --- a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs +++ b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs @@ -139,6 +139,10 @@ impl Rule for OverWindowToTopNRule { /// Returns `None` if the conditions are too complex or invalid. `Some((limit, offset))` otherwise. fn handle_rank_preds(rank_preds: &[ExprImpl], window_func_pos: usize) -> Option<(u64, u64)> { + if rank_preds.is_empty() { + return None; + } + // rank >= lb let mut lb: Option = None; // rank <= ub