diff --git a/src/frontend/planner_test/tests/testdata/input/over_window_function.yaml b/src/frontend/planner_test/tests/testdata/input/over_window_function.yaml index 2a3b6744bbc19..4406089b273b8 100644 --- a/src/frontend/planner_test/tests/testdata/input/over_window_function.yaml +++ b/src/frontend/planner_test/tests/testdata/input/over_window_function.yaml @@ -135,16 +135,16 @@ select x, y, max(x) over(PARTITION BY y ORDER BY x RANGE 100 PRECEDING) from t; expected_outputs: - logical_plan - - stream_error - - batch_error + - stream_plan + - batch_plan - id: aggregate with over clause, range frame definition with between sql: | create table t(x int, y int); select x, y, max(x) over(PARTITION BY y ORDER BY x RANGE BETWEEN 100 PRECEDING and UNBOUNDED FOLLOWING) from t; expected_outputs: - logical_plan - - stream_error - - batch_error + - stream_plan + - batch_plan - id: aggregate with over clause, unbounded range, with ORDER BY sql: | create table t(x int, y int); @@ -556,20 +556,20 @@ - sql: | create table t (i int, bi bigint, d decimal, f float, da date, t time, ts timestamp, tstz timestamptz, itv interval); select - count(*) over (partition by 1::int order by i range 1 preceding), - count(*) over (partition by 1::int order by bi range 1 preceding), - count(*) over (partition by 1::int order by d range 1.5 preceding), - count(*) over (partition by 1::int order by f range 1.5 preceding), - -- count(*) over (partition by 1::int order by da range '1 day' preceding), -- `date` not supported yet - -- count(*) over (partition by 1::int order by t range '1 min' preceding), -- `time` not supported yet - count(*) over (partition by 1::int order by ts range '1 day 1 hour' preceding), - count(*) over (partition by 1::int order by tstz range '1 min' preceding) + count(*) over (partition by 1::int order by i range 1 preceding) as col1, + count(*) over (partition by 1::int order by bi range 1 preceding) as col2, + count(*) over (partition by 1::int order by d range 1.5 preceding) as col3, + count(*) over (partition by 1::int order by f range 1.5 preceding) as col4, + -- count(*) over (partition by 1::int order by da range '1 day' preceding) as col5, -- `date` not supported yet + -- count(*) over (partition by 1::int order by t range '1 min' preceding) as col6, -- `time` not supported yet + count(*) over (partition by 1::int order by ts range '1 day 1 hour' preceding) as col7, + count(*) over (partition by 1::int order by tstz range '1 min' preceding) as col8 from t; expected_outputs: - logical_plan - optimized_logical_plan_for_stream - - stream_error - - batch_error + - stream_plan + - batch_plan - sql: | create table t (i int, bi bigint, d decimal, f float, da date, t time, ts timestamp, tstz timestamptz, itv interval); select diff --git a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml index 43dd513527484..6999cdeededfc 100644 --- a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml +++ b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml @@ -264,12 +264,17 @@ └─LogicalOverWindow { window_functions: [max(t.x) OVER(PARTITION BY t.y ORDER BY t.x ASC RANGE BETWEEN 100 PRECEDING AND CURRENT ROW)] } └─LogicalProject { exprs: [t.x, t.y, t._row_id] } └─LogicalScan { table: t, columns: [t.x, t.y, t._row_id] } - batch_error: |- - Feature is not yet implemented: Window function with `RANGE` frame is not supported yet - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml - stream_error: |- - Feature is not yet implemented: Window function with `RANGE` frame is not supported yet - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchOverWindow { window_functions: [max(t.x) OVER(PARTITION BY t.y ORDER BY t.x ASC RANGE BETWEEN 100 PRECEDING AND CURRENT ROW)] } + └─BatchExchange { order: [t.y ASC, t.x ASC], dist: HashShard(t.y) } + └─BatchSort { order: [t.y ASC, t.x ASC] } + └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, y, t._row_id(hidden), max], stream_key: [t._row_id, y], pk_columns: [t._row_id, y], pk_conflict: NoCheck } + └─StreamOverWindow { window_functions: [max(t.x) OVER(PARTITION BY t.y ORDER BY t.x ASC RANGE BETWEEN 100 PRECEDING AND CURRENT ROW)] } + └─StreamExchange { dist: HashShard(t.y) } + └─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - id: aggregate with over clause, range frame definition with between sql: | create table t(x int, y int); @@ -279,12 +284,17 @@ └─LogicalOverWindow { window_functions: [max(t.x) OVER(PARTITION BY t.y ORDER BY t.x ASC RANGE BETWEEN 100 PRECEDING AND UNBOUNDED FOLLOWING)] } └─LogicalProject { exprs: [t.x, t.y, t._row_id] } └─LogicalScan { table: t, columns: [t.x, t.y, t._row_id] } - batch_error: |- - Feature is not yet implemented: Window function with `RANGE` frame is not supported yet - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml - stream_error: |- - Feature is not yet implemented: Window function with `RANGE` frame is not supported yet - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchOverWindow { window_functions: [max(t.x) OVER(PARTITION BY t.y ORDER BY t.x ASC RANGE BETWEEN 100 PRECEDING AND UNBOUNDED FOLLOWING)] } + └─BatchExchange { order: [t.y ASC, t.x ASC], dist: HashShard(t.y) } + └─BatchSort { order: [t.y ASC, t.x ASC] } + └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [x, y, t._row_id(hidden), max], stream_key: [t._row_id, y], pk_columns: [t._row_id, y], pk_conflict: NoCheck } + └─StreamOverWindow { window_functions: [max(t.x) OVER(PARTITION BY t.y ORDER BY t.x ASC RANGE BETWEEN 100 PRECEDING AND UNBOUNDED FOLLOWING)] } + └─StreamExchange { dist: HashShard(t.y) } + └─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - id: aggregate with over clause, unbounded range, with ORDER BY sql: | create table t(x int, y int); @@ -1128,14 +1138,14 @@ - sql: | create table t (i int, bi bigint, d decimal, f float, da date, t time, ts timestamp, tstz timestamptz, itv interval); select - count(*) over (partition by 1::int order by i range 1 preceding), - count(*) over (partition by 1::int order by bi range 1 preceding), - count(*) over (partition by 1::int order by d range 1.5 preceding), - count(*) over (partition by 1::int order by f range 1.5 preceding), - -- count(*) over (partition by 1::int order by da range '1 day' preceding), -- `date` not supported yet - -- count(*) over (partition by 1::int order by t range '1 min' preceding), -- `time` not supported yet - count(*) over (partition by 1::int order by ts range '1 day 1 hour' preceding), - count(*) over (partition by 1::int order by tstz range '1 min' preceding) + count(*) over (partition by 1::int order by i range 1 preceding) as col1, + count(*) over (partition by 1::int order by bi range 1 preceding) as col2, + count(*) over (partition by 1::int order by d range 1.5 preceding) as col3, + count(*) over (partition by 1::int order by f range 1.5 preceding) as col4, + -- count(*) over (partition by 1::int order by da range '1 day' preceding) as col5, -- `date` not supported yet + -- count(*) over (partition by 1::int order by t range '1 min' preceding) as col6, -- `time` not supported yet + count(*) over (partition by 1::int order by ts range '1 day 1 hour' preceding) as col7, + count(*) over (partition by 1::int order by tstz range '1 min' preceding) as col8 from t; logical_plan: |- LogicalProject { exprs: [count, count, count, count, count, count] } @@ -1157,12 +1167,46 @@ └─LogicalOverWindow { window_functions: [count() OVER(PARTITION BY 1:Int32 ORDER BY t.i ASC RANGE BETWEEN 1 PRECEDING AND CURRENT ROW)] } └─LogicalProject { exprs: [t.i, t.bi, t.d, t.f, t.ts, t.tstz, 1:Int32] } └─LogicalScan { table: t, columns: [t.i, t.bi, t.d, t.f, t.ts, t.tstz] } - batch_error: |- - Feature is not yet implemented: Window function with `RANGE` frame is not supported yet - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml - stream_error: |- - Feature is not yet implemented: Window function with `RANGE` frame is not supported yet - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [count, count, count, count, count, count] } + └─BatchOverWindow { window_functions: [count() OVER(PARTITION BY 1:Int32 ORDER BY t.tstz ASC RANGE BETWEEN 00:01:00 PRECEDING AND CURRENT ROW)] } + └─BatchSort { order: [1:Int32 ASC, t.tstz ASC] } + └─BatchProject { exprs: [t.tstz, 1:Int32, count, count, count, count, count] } + └─BatchOverWindow { window_functions: [count() OVER(PARTITION BY 1:Int32 ORDER BY t.ts ASC RANGE BETWEEN 1 day 01:00:00 PRECEDING AND CURRENT ROW)] } + └─BatchSort { order: [1:Int32 ASC, t.ts ASC] } + └─BatchProject { exprs: [t.ts, t.tstz, 1:Int32, count, count, count, count] } + └─BatchOverWindow { window_functions: [count() OVER(PARTITION BY 1:Int32 ORDER BY t.f ASC RANGE BETWEEN 1.5 PRECEDING AND CURRENT ROW)] } + └─BatchSort { order: [1:Int32 ASC, t.f ASC] } + └─BatchProject { exprs: [t.f, t.ts, t.tstz, 1:Int32, count, count, count] } + └─BatchOverWindow { window_functions: [count() OVER(PARTITION BY 1:Int32 ORDER BY t.d ASC RANGE BETWEEN 1.5 PRECEDING AND CURRENT ROW)] } + └─BatchSort { order: [1:Int32 ASC, t.d ASC] } + └─BatchProject { exprs: [t.d, t.f, t.ts, t.tstz, 1:Int32, count, count] } + └─BatchOverWindow { window_functions: [count() OVER(PARTITION BY 1:Int32 ORDER BY t.bi ASC RANGE BETWEEN 1 PRECEDING AND CURRENT ROW)] } + └─BatchSort { order: [1:Int32 ASC, t.bi ASC] } + └─BatchProject { exprs: [t.bi, t.d, t.f, t.ts, t.tstz, 1:Int32, count] } + └─BatchOverWindow { window_functions: [count() OVER(PARTITION BY 1:Int32 ORDER BY t.i ASC RANGE BETWEEN 1 PRECEDING AND CURRENT ROW)] } + └─BatchExchange { order: [1:Int32 ASC, t.i ASC], dist: HashShard(1:Int32) } + └─BatchSort { order: [1:Int32 ASC, t.i ASC] } + └─BatchProject { exprs: [t.i, t.bi, t.d, t.f, t.ts, t.tstz, 1:Int32] } + └─BatchScan { table: t, columns: [t.i, t.bi, t.d, t.f, t.ts, t.tstz], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [col1, col2, col3, col4, col7, col8, t._row_id(hidden), 1:Int32(hidden)], stream_key: [t._row_id, 1:Int32], pk_columns: [t._row_id, 1:Int32], pk_conflict: NoCheck } + └─StreamProject { exprs: [count, count, count, count, count, count, t._row_id, 1:Int32] } + └─StreamOverWindow { window_functions: [count() OVER(PARTITION BY 1:Int32 ORDER BY t.tstz ASC RANGE BETWEEN 00:01:00 PRECEDING AND CURRENT ROW)] } + └─StreamProject { exprs: [t.tstz, 1:Int32, count, count, count, count, count, t._row_id] } + └─StreamOverWindow { window_functions: [count() OVER(PARTITION BY 1:Int32 ORDER BY t.ts ASC RANGE BETWEEN 1 day 01:00:00 PRECEDING AND CURRENT ROW)] } + └─StreamProject { exprs: [t.ts, t.tstz, 1:Int32, count, count, count, count, t._row_id] } + └─StreamOverWindow { window_functions: [count() OVER(PARTITION BY 1:Int32 ORDER BY t.f ASC RANGE BETWEEN 1.5 PRECEDING AND CURRENT ROW)] } + └─StreamProject { exprs: [t.f, t.ts, t.tstz, 1:Int32, count, count, count, t._row_id] } + └─StreamOverWindow { window_functions: [count() OVER(PARTITION BY 1:Int32 ORDER BY t.d ASC RANGE BETWEEN 1.5 PRECEDING AND CURRENT ROW)] } + └─StreamProject { exprs: [t.d, t.f, t.ts, t.tstz, 1:Int32, count, count, t._row_id] } + └─StreamOverWindow { window_functions: [count() OVER(PARTITION BY 1:Int32 ORDER BY t.bi ASC RANGE BETWEEN 1 PRECEDING AND CURRENT ROW)] } + └─StreamProject { exprs: [t.bi, t.d, t.f, t.ts, t.tstz, 1:Int32, count, t._row_id] } + └─StreamOverWindow { window_functions: [count() OVER(PARTITION BY 1:Int32 ORDER BY t.i ASC RANGE BETWEEN 1 PRECEDING AND CURRENT ROW)] } + └─StreamExchange { dist: HashShard(1:Int32) } + └─StreamProject { exprs: [t.i, t.bi, t.d, t.f, t.ts, t.tstz, 1:Int32, t._row_id] } + └─StreamTableScan { table: t, columns: [t.i, t.bi, t.d, t.f, t.ts, t.tstz, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - sql: | create table t (i int, bi bigint, d decimal, f float, da date, t time, ts timestamp, tstz timestamptz, itv interval); select