Skip to content

Commit

Permalink
feat(over window): frontend support for session frame (#17098)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Jun 13, 2024
1 parent 5f41f27 commit 679d90e
Show file tree
Hide file tree
Showing 10 changed files with 352 additions and 53 deletions.
1 change: 1 addition & 0 deletions e2e_test/batch/over_window/main.slt.part
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
include ./generated/main.slt.part
include ./session/mod.slt.part
61 changes: 61 additions & 0 deletions e2e_test/batch/over_window/session/mod.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Because currently general streaming version of session window is not supported yet,
# we only add e2e for batch mode.

statement ok
create table t (
tm timestamp,
foo int,
bar int
);

statement ok
create view v1 as
select
*,
first_value(tm) over (partition by bar order by tm session with gap '10 minutes') as window_start,
last_value(tm) over (partition by bar order by tm session with gap '10 minutes') as window_end
from t;

statement ok
insert into t values
('2023-05-06 16:51:00', 1, 100)
, ('2023-05-06 16:56:00', 8, 100)
, ('2023-05-06 17:30:00', 3, 200)
, ('2023-05-06 17:35:00', 5, 100)
, ('2023-05-06 17:59:00', 4, 100)
, ('2023-05-06 18:01:00', 6, 200)
;

query TiiTT
select * from v1 order by tm;
----
2023-05-06 16:51:00 1 100 2023-05-06 16:51:00 2023-05-06 16:56:00
2023-05-06 16:56:00 8 100 2023-05-06 16:51:00 2023-05-06 16:56:00
2023-05-06 17:30:00 3 200 2023-05-06 17:30:00 2023-05-06 17:30:00
2023-05-06 17:35:00 5 100 2023-05-06 17:35:00 2023-05-06 17:35:00
2023-05-06 17:59:00 4 100 2023-05-06 17:59:00 2023-05-06 17:59:00
2023-05-06 18:01:00 6 200 2023-05-06 18:01:00 2023-05-06 18:01:00

statement ok
insert into t values
('2023-05-06 18:08:00', 7, 100)
, ('2023-05-06 18:10:00', 9, 200)
;

query TiiTT
select * from v1 order by tm;
----
2023-05-06 16:51:00 1 100 2023-05-06 16:51:00 2023-05-06 16:56:00
2023-05-06 16:56:00 8 100 2023-05-06 16:51:00 2023-05-06 16:56:00
2023-05-06 17:30:00 3 200 2023-05-06 17:30:00 2023-05-06 17:30:00
2023-05-06 17:35:00 5 100 2023-05-06 17:35:00 2023-05-06 17:35:00
2023-05-06 17:59:00 4 100 2023-05-06 17:59:00 2023-05-06 18:08:00
2023-05-06 18:01:00 6 200 2023-05-06 18:01:00 2023-05-06 18:10:00
2023-05-06 18:08:00 7 100 2023-05-06 17:59:00 2023-05-06 18:08:00
2023-05-06 18:10:00 9 200 2023-05-06 18:01:00 2023-05-06 18:10:00

statement ok
drop view v1;

statement ok
drop table t;
26 changes: 26 additions & 0 deletions e2e_test/streaming/eowc/eowc_over_window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ select
from t
emit on window close;

statement ok
create materialized view mv4 as
select
*,
first_value(tm) over (partition by bar order by tm session with gap '10 minutes') as window_start,
last_value(tm) over (partition by bar order by tm session with gap '10 minutes') as window_end
from t
emit on window close;

statement ok
insert into t values
('2023-05-06 16:51:00', 1, 100)
Expand Down Expand Up @@ -71,6 +80,12 @@ select * from mv3 order by tm;
2023-05-06 17:30:00 3 200 1
2023-05-06 17:35:00 5 100 3

query TiiTT
select * from mv4 order by tm;
----
2023-05-06 16:51:00 1 100 2023-05-06 16:51:00 2023-05-06 16:56:00
2023-05-06 16:56:00 8 100 2023-05-06 16:51:00 2023-05-06 16:56:00

statement ok
insert into t values
('2023-05-06 18:10:00', 7, 100)
Expand Down Expand Up @@ -100,6 +115,14 @@ select * from mv3 order by tm;
2023-05-06 17:59:00 4 100 4
2023-05-06 18:01:00 6 200 2

query TiiTT
select * from mv4 order by tm;
----
2023-05-06 16:51:00 1 100 2023-05-06 16:51:00 2023-05-06 16:56:00
2023-05-06 16:56:00 8 100 2023-05-06 16:51:00 2023-05-06 16:56:00
2023-05-06 17:30:00 3 200 2023-05-06 17:30:00 2023-05-06 17:30:00
2023-05-06 17:35:00 5 100 2023-05-06 17:35:00 2023-05-06 17:35:00

statement ok
drop materialized view mv1;

Expand All @@ -109,5 +132,8 @@ drop materialized view mv2;
statement ok
drop materialized view mv3;

statement ok
drop materialized view mv4;

statement ok
drop table t;
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,53 @@
from t;
expected_outputs:
- binder_error

# Session frames
- sql: |
create table t (i int, bi bigint, d decimal, f float, da date, t time, ts timestamp, tstz timestamptz, itv interval);
select
first_value(i) over (partition by bi order by i session with gap 10) as col1,
first_value(bi) over (partition by i order by bi session with gap 10) as col2,
first_value(i) over (partition by bi order by d session with gap 1.5) as col3,
first_value(i) over (partition by bi order by f session with gap 1.5) as col4,
-- first_value(i) over (partition by bi order by da session with gap '1 day') as col5, -- `date` not supported yet
-- first_value(i) over (partition by bi order by t session with gap '1 min') as col6, -- `time` not supported yet
first_value(i) over (partition by bi order by ts session with gap '1 day 1 hour') as col7,
first_value(i) over (partition by bi order by tstz session with gap '1 min') as col8
from t;
expected_outputs:
- logical_plan
- optimized_logical_plan_for_stream
- stream_error # not supported yet
- batch_plan
- sql: |
create table t (i int, bi bigint, ts timestamp, watermark for ts as ts - interval '1 minute') append only;
select
first_value(i) over (partition by bi order by ts session with gap '10 minutes') as window_start,
last_value(i) over (partition by bi order by ts session with gap '10 minutes') as window_end
from t;
expected_outputs:
- logical_plan
- eowc_stream_plan
- batch_plan
- sql: |
create table t (i int, bi bigint, d decimal, f float, da date, t time, ts timestamp, tstz timestamptz, itv interval);
select
count(*) over (partition by 1::int order by da session with gap '1 day') -- `date` not supported yet
from t;
expected_outputs:
- binder_error
- sql: |
create table t (i int, bi bigint, d decimal, f float, da date, t time, ts timestamp, tstz timestamptz, itv interval);
select
count(*) over (partition by 1::int order by t session with gap '1 min') -- `time` not supported yet
from t;
expected_outputs:
- binder_error
- sql: |
create table t (i int, bi bigint, d decimal, f float, da date, t time, ts timestamp, tstz timestamptz, itv interval);
select
count(*) over (partition by 1::int order by tstz session with gap '1 day 1 hour') -- `timestamptz` +/- 'x month x day' not supported yet
from t;
expected_outputs:
- binder_error
Original file line number Diff line number Diff line change
Expand Up @@ -1235,3 +1235,115 @@
Caused by these errors (recent errors listed first):
1: Expr error
2: for frame order column of type `timestamptz`, offset should not have non-zero `month` and `day`
- sql: |
create table t (i int, bi bigint, d decimal, f float, da date, t time, ts timestamp, tstz timestamptz, itv interval);
select
first_value(i) over (partition by bi order by i session with gap 10) as col1,
first_value(bi) over (partition by i order by bi session with gap 10) as col2,
first_value(i) over (partition by bi order by d session with gap 1.5) as col3,
first_value(i) over (partition by bi order by f session with gap 1.5) as col4,
-- first_value(i) over (partition by bi order by da session with gap '1 day') as col5, -- `date` not supported yet
-- first_value(i) over (partition by bi order by t session with gap '1 min') as col6, -- `time` not supported yet
first_value(i) over (partition by bi order by ts session with gap '1 day 1 hour') as col7,
first_value(i) over (partition by bi order by tstz session with gap '1 min') as col8
from t;
logical_plan: |-
LogicalProject { exprs: [first_value, first_value, first_value, first_value, first_value, first_value] }
└─LogicalOverWindow { window_functions: [first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.i ASC SESSION WITH GAP 10), first_value(t.bi) OVER(PARTITION BY t.i ORDER BY t.bi ASC SESSION WITH GAP 10), first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.d ASC SESSION WITH GAP 1.5), first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.f ASC SESSION WITH GAP 1.5), first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.ts ASC SESSION WITH GAP 1 day 01:00:00), first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.tstz ASC SESSION WITH GAP 00:01:00)] }
└─LogicalProject { exprs: [t.i, t.bi, t.d, t.f, t.da, t.t, t.ts, t.tstz, t.itv, t._row_id] }
└─LogicalScan { table: t, columns: [t.i, t.bi, t.d, t.f, t.da, t.t, t.ts, t.tstz, t.itv, t._row_id] }
optimized_logical_plan_for_stream: |-
LogicalProject { exprs: [first_value, first_value, first_value, first_value, first_value, first_value] }
└─LogicalOverWindow { window_functions: [first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.tstz ASC SESSION WITH GAP 00:01:00)] }
└─LogicalProject { exprs: [t.i, t.bi, t.tstz, first_value, first_value, first_value, first_value, first_value] }
└─LogicalOverWindow { window_functions: [first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.ts ASC SESSION WITH GAP 1 day 01:00:00)] }
└─LogicalProject { exprs: [t.i, t.bi, t.ts, t.tstz, first_value, first_value, first_value, first_value] }
└─LogicalOverWindow { window_functions: [first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.f ASC SESSION WITH GAP 1.5)] }
└─LogicalProject { exprs: [t.i, t.bi, t.f, t.ts, t.tstz, first_value, first_value, first_value] }
└─LogicalOverWindow { window_functions: [first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.d ASC SESSION WITH GAP 1.5)] }
└─LogicalOverWindow { window_functions: [first_value(t.bi) OVER(PARTITION BY t.i ORDER BY t.bi ASC SESSION WITH GAP 10)] }
└─LogicalOverWindow { window_functions: [first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.i ASC SESSION WITH GAP 10)] }
└─LogicalScan { table: t, columns: [t.i, t.bi, t.d, t.f, t.ts, t.tstz] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [first_value, first_value, first_value, first_value, first_value, first_value] }
└─BatchOverWindow { window_functions: [first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.tstz ASC SESSION WITH GAP 00:01:00)] }
└─BatchSort { order: [t.bi ASC, t.tstz ASC] }
└─BatchProject { exprs: [t.i, t.bi, t.tstz, first_value, first_value, first_value, first_value, first_value] }
└─BatchOverWindow { window_functions: [first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.ts ASC SESSION WITH GAP 1 day 01:00:00)] }
└─BatchSort { order: [t.bi ASC, t.ts ASC] }
└─BatchProject { exprs: [t.i, t.bi, t.ts, t.tstz, first_value, first_value, first_value, first_value] }
└─BatchOverWindow { window_functions: [first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.f ASC SESSION WITH GAP 1.5)] }
└─BatchSort { order: [t.bi ASC, t.f ASC] }
└─BatchProject { exprs: [t.i, t.bi, t.f, t.ts, t.tstz, first_value, first_value, first_value] }
└─BatchOverWindow { window_functions: [first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.d ASC SESSION WITH GAP 1.5)] }
└─BatchExchange { order: [t.bi ASC, t.d ASC], dist: HashShard(t.bi) }
└─BatchSort { order: [t.bi ASC, t.d ASC] }
└─BatchOverWindow { window_functions: [first_value(t.bi) OVER(PARTITION BY t.i ORDER BY t.bi ASC SESSION WITH GAP 10)] }
└─BatchExchange { order: [t.i ASC, t.bi ASC], dist: HashShard(t.i) }
└─BatchSort { order: [t.i ASC, t.bi ASC] }
└─BatchOverWindow { window_functions: [first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.i ASC SESSION WITH GAP 10)] }
└─BatchExchange { order: [t.bi ASC, t.i ASC], dist: HashShard(t.bi) }
└─BatchSort { order: [t.bi ASC, t.i ASC] }
└─BatchScan { table: t, columns: [t.i, t.bi, t.d, t.f, t.ts, t.tstz], distribution: SomeShard }
stream_error: |-
Feature is not yet implemented: Session frame is not yet supported in general streaming mode. Please consider using Emit-On-Window-Close mode.
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml
- sql: |
create table t (i int, bi bigint, ts timestamp, watermark for ts as ts - interval '1 minute') append only;
select
first_value(i) over (partition by bi order by ts session with gap '10 minutes') as window_start,
last_value(i) over (partition by bi order by ts session with gap '10 minutes') as window_end
from t;
logical_plan: |-
LogicalProject { exprs: [first_value, last_value] }
└─LogicalOverWindow { window_functions: [first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.ts ASC SESSION WITH GAP 00:10:00), last_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.ts ASC SESSION WITH GAP 00:10:00)] }
└─LogicalProject { exprs: [t.i, t.bi, t.ts, t._row_id] }
└─LogicalScan { table: t, columns: [t.i, t.bi, t.ts, t._row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [first_value, last_value] }
└─BatchOverWindow { window_functions: [first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.ts ASC SESSION WITH GAP 00:10:00), last_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.ts ASC SESSION WITH GAP 00:10:00)] }
└─BatchExchange { order: [t.bi ASC, t.ts ASC], dist: HashShard(t.bi) }
└─BatchSort { order: [t.bi ASC, t.ts ASC] }
└─BatchScan { table: t, columns: [t.i, t.bi, t.ts], distribution: SomeShard }
eowc_stream_plan: |-
StreamMaterialize { columns: [window_start, window_end, t._row_id(hidden), t.bi(hidden)], stream_key: [t._row_id, t.bi], pk_columns: [t._row_id, t.bi], pk_conflict: NoCheck }
└─StreamProject { exprs: [first_value, last_value, t._row_id, t.bi] }
└─StreamEowcOverWindow { window_functions: [first_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.ts ASC SESSION WITH GAP 00:10:00), last_value(t.i) OVER(PARTITION BY t.bi ORDER BY t.ts ASC SESSION WITH GAP 00:10:00)] }
└─StreamEowcSort { sort_column: t.ts }
└─StreamExchange { dist: HashShard(t.bi) }
└─StreamTableScan { table: t, columns: [t.i, t.bi, t.ts, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t (i int, bi bigint, d decimal, f float, da date, t time, ts timestamp, tstz timestamptz, itv interval);
select
count(*) over (partition by 1::int order by da session with gap '1 day') -- `date` not supported yet
from t;
binder_error: |
Failed to bind expression: count(*) OVER (PARTITION BY CAST(1 AS INT) ORDER BY da SESSION WITH GAP '1 day')
Caused by:
Feature is not yet implemented: `SESSION` frame with offset of type `date` is not implemented yet, please manually cast the `ORDER BY` column to `timestamp`
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml
- sql: |
create table t (i int, bi bigint, d decimal, f float, da date, t time, ts timestamp, tstz timestamptz, itv interval);
select
count(*) over (partition by 1::int order by t session with gap '1 min') -- `time` not supported yet
from t;
binder_error: |
Failed to bind expression: count(*) OVER (PARTITION BY CAST(1 AS INT) ORDER BY t SESSION WITH GAP '1 min')
Caused by:
Feature is not yet implemented: `SESSION` frame with offset of type `time without time zone` is not implemented yet, please manually cast the `ORDER BY` column to `timestamp`
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml
- sql: |
create table t (i int, bi bigint, d decimal, f float, da date, t time, ts timestamp, tstz timestamptz, itv interval);
select
count(*) over (partition by 1::int order by tstz session with gap '1 day 1 hour') -- `timestamptz` +/- 'x month x day' not supported yet
from t;
binder_error: |
Failed to bind expression: count(*) OVER (PARTITION BY CAST(1 AS INT) ORDER BY tstz SESSION WITH GAP '1 day 1 hour')
Caused by these errors (recent errors listed first):
1: Expr error
2: for session order column of type `timestamptz`, gap should not have non-zero `month` and `day`
Loading

0 comments on commit 679d90e

Please sign in to comment.