Skip to content

Commit

Permalink
fix(over window): fix over window predicate pushdown (#13662)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Feb 1, 2024
1 parent 8766b07 commit fdc26bf
Show file tree
Hide file tree
Showing 16 changed files with 252 additions and 46 deletions.
1 change: 1 addition & 0 deletions e2e_test/over_window/generated/batch/main.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ include ./rank_func/mod.slt.part
include ./expr_in_win_func/mod.slt.part
include ./agg_in_win_func/mod.slt.part
include ./opt_agg_then_join/mod.slt.part
include ./with_filter/mod.slt.part
51 changes: 51 additions & 0 deletions e2e_test/over_window/generated/batch/with_filter/mod.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# This file is generated by `gen.py`. Do not edit it manually!

# Test window functions together with filters.
# https://github.com/risingwavelabs/risingwave/issues/13653

statement ok
create table t (id int, cat varchar, rule varchar, at timestamptz);

statement ok
insert into t values
(1, 'foo', 'A', '2023-11-23T12:00:42Z')
, (2, 'foo', 'B', '2023-11-23T12:01:15Z');

statement ok
create view v1 as
select rule, lag(rule) over (partition by cat order by at) from t where rule = 'B';

statement ok
create view v2 as
select * from (select rule, lag(rule) over (partition by cat order by at) as prev_rule from t) where rule = 'B';

statement ok
create view v3 as
select * from (select rule, at, row_number() over (partition by cat order by at) as rank from t) where at = '2023-11-23T12:01:15Z'::timestamptz;

query TT
select * from v1;
----
B NULL

query TT
select * from v2;
----
B A

query TT
select * from v3;
----
B 2023-11-23 12:01:15+00:00 2

statement ok
drop view v1;

statement ok
drop view v2;

statement ok
drop view v3;

statement ok
drop table t;
1 change: 1 addition & 0 deletions e2e_test/over_window/generated/streaming/main.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ include ./rank_func/mod.slt.part
include ./expr_in_win_func/mod.slt.part
include ./agg_in_win_func/mod.slt.part
include ./opt_agg_then_join/mod.slt.part
include ./with_filter/mod.slt.part
51 changes: 51 additions & 0 deletions e2e_test/over_window/generated/streaming/with_filter/mod.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# This file is generated by `gen.py`. Do not edit it manually!

# Test window functions together with filters.
# https://github.com/risingwavelabs/risingwave/issues/13653

statement ok
create table t (id int, cat varchar, rule varchar, at timestamptz);

statement ok
insert into t values
(1, 'foo', 'A', '2023-11-23T12:00:42Z')
, (2, 'foo', 'B', '2023-11-23T12:01:15Z');

statement ok
create materialized view v1 as
select rule, lag(rule) over (partition by cat order by at) from t where rule = 'B';

statement ok
create materialized view v2 as
select * from (select rule, lag(rule) over (partition by cat order by at) as prev_rule from t) where rule = 'B';

statement ok
create materialized view v3 as
select * from (select rule, at, row_number() over (partition by cat order by at) as rank from t) where at = '2023-11-23T12:01:15Z'::timestamptz;

query TT
select * from v1;
----
B NULL

query TT
select * from v2;
----
B A

query TT
select * from v3;
----
B 2023-11-23 12:01:15+00:00 2

statement ok
drop materialized view v1;

statement ok
drop materialized view v2;

statement ok
drop materialized view v3;

statement ok
drop table t;
1 change: 1 addition & 0 deletions e2e_test/over_window/templates/main.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ include ./rank_func/mod.slt.part
include ./expr_in_win_func/mod.slt.part
include ./agg_in_win_func/mod.slt.part
include ./opt_agg_then_join/mod.slt.part
include ./with_filter/mod.slt.part
49 changes: 49 additions & 0 deletions e2e_test/over_window/templates/with_filter/mod.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Test window functions together with filters.
# https://github.com/risingwavelabs/risingwave/issues/13653

statement ok
create table t (id int, cat varchar, rule varchar, at timestamptz);

statement ok
insert into t values
(1, 'foo', 'A', '2023-11-23T12:00:42Z')
, (2, 'foo', 'B', '2023-11-23T12:01:15Z');

statement ok
create $view_type v1 as
select rule, lag(rule) over (partition by cat order by at) from t where rule = 'B';

statement ok
create $view_type v2 as
select * from (select rule, lag(rule) over (partition by cat order by at) as prev_rule from t) where rule = 'B';

statement ok
create $view_type v3 as
select * from (select rule, at, row_number() over (partition by cat order by at) as rank from t) where at = '2023-11-23T12:01:15Z'::timestamptz;

query TT
select * from v1;
----
B NULL

query TT
select * from v2;
----
B A

query TT
select * from v3;
----
B 2023-11-23 12:01:15+00:00 2

statement ok
drop $view_type v1;

statement ok
drop $view_type v2;

statement ok
drop $view_type v3;

statement ok
drop table t;
1 change: 0 additions & 1 deletion src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#![feature(if_let_guard)]
#![feature(iterator_try_collect)]
#![feature(try_blocks)]
#![feature(let_else)]

use std::time::Duration;

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kinesis/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ mod tests {
},

scan_startup_mode: None,
timestamp_offset: Some(123456789098765432),
timestamp_offset: Some("123456789098765432".to_string()),
};
let client = KinesisSplitReader::new(
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,3 +474,19 @@
- stream_plan
- optimized_logical_plan_for_batch
- batch_plan

# With filter
- sql: |
create table t (id int, cat varchar, rule varchar, at timestamptz);
select * from (select cat, rule, at, lag(rule) over (partition by cat order by at) as prev_rule from t) as with_prev
where rule = 'B' and cat is not null and at = '2023-11-23T12:00:42Z'::timestamptz;
expected_outputs:
- optimized_logical_plan_for_stream
- optimized_logical_plan_for_batch
- sql: |
create table t (id int, cat varchar, rule varchar, at timestamptz);
select cat, rule, at, lag(rule) over (partition by cat order by at) as prev_rule from t
where rule = 'B' and cat is not null and at = '2023-11-23T12:00:42Z'::timestamptz;
expected_outputs:
- optimized_logical_plan_for_stream
- optimized_logical_plan_for_batch
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,10 @@
optimized_logical_plan_for_batch: |-
LogicalProject { exprs: [row_number] }
└─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─LogicalTopN { order: [t.y ASC], limit: 3, offset: 0, group_key: [t.x] }
└─LogicalScan { table: t, output_columns: [t.x, t.y], required_columns: [t.x, t.y, t.z], predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) AND (t.x > 0:Int32) }
└─LogicalProject { exprs: [t.x, t.y] }
└─LogicalFilter { predicate: (t.z > 0:Int32) AND (t.y > 0:Int32) }
└─LogicalTopN { order: [t.y ASC], limit: 3, offset: 0, group_key: [t.x] }
└─LogicalScan { table: t, columns: [t.x, t.y, t.z], predicate: (t.x > 0:Int32) }
- name: mixed
sql: |
create table t (v1 bigint, v2 double precision, v3 int);
Expand Down
Loading

0 comments on commit fdc26bf

Please sign in to comment.