diff --git a/src/frontend/planner_test/tests/testdata/emit_on_window_close.yaml b/src/frontend/planner_test/tests/testdata/emit_on_window_close.yaml index d2b0b802427a5..ca24c39743fef 100644 --- a/src/frontend/planner_test/tests/testdata/emit_on_window_close.yaml +++ b/src/frontend/planner_test/tests/testdata/emit_on_window_close.yaml @@ -122,9 +122,49 @@ - sql: | create source t (a int, b int, tm timestamp, watermark for tm as tm - interval '5 minutes') with (connector = 'kinesis') ROW FORMAT JSON; select lag(a, 2) over (partition by b order by tm) from t; + eowc_stream_plan: | + StreamMaterialize { columns: [lag, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: "NoCheck" } + └─StreamExchange { dist: HashShard(_row_id) } + └─StreamProject { exprs: [lag, _row_id] } + └─StreamEowcOverWindow { window_functions: [lag(a) OVER(PARTITION BY b ORDER BY tm ASC ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)] } + └─StreamSort { sort_column_index: 2 } + └─StreamExchange { dist: HashShard(b) } + └─StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [tm] } + └─StreamRowIdGen { row_id_index: 3 } + └─StreamWatermarkFilter { watermark_descs: [idx: 2, expr: (tm - '00:05:00':Interval)] } + └─StreamSource { source: "t", columns: ["a", "b", "tm", "_row_id"] } + eowc_stream_dist_plan: |+ + Fragment 0 + StreamMaterialize { columns: [lag, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: "NoCheck" } + ├── materialized table: 4294967294 + └── StreamExchange Hash([1]) from 1 + + Fragment 1 + StreamProject { exprs: [lag, _row_id] } + └── StreamEowcOverWindow { window_functions: [lag(a) OVER(PARTITION BY b ORDER BY tm ASC ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)] } + └── StreamSort { sort_column_index: 2 } + └── StreamExchange Hash([1]) from 2 + + Fragment 2 + StreamProject { exprs: [a, b, tm, _row_id], output_watermarks: [tm] } + └── StreamRowIdGen { row_id_index: 3 } + └── StreamWatermarkFilter { watermark_descs: [idx: 2, expr: (tm - '00:05:00':Interval)] } + └── StreamSource { source: "t", columns: ["a", "b", "tm", "_row_id"] } { source state table: 3 } + + Table 3 + ├── columns: [ partition_id, offset_info ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 0, 1 ] + ├── distribution key: [] + └── read pk prefix len hint: 1 + + Table 4294967294 + ├── columns: [ lag, _row_id ] + ├── primary key: [ $1 ASC ] + ├── value indices: [ 0, 1 ] + ├── distribution key: [ 1 ] + └── read pk prefix len hint: 1 + stream_error: |- Feature is not yet implemented: OverAgg to stream Tracking issue: https://github.com/risingwavelabs/risingwave/issues/9124 - eowc_stream_error: |- - Feature is not yet implemented: OverAgg to stream - Tracking issue: https://github.com/risingwavelabs/risingwave/issues/9124 diff --git a/src/frontend/planner_test/tests/testdata/over_window_function.yaml b/src/frontend/planner_test/tests/testdata/over_window_function.yaml index 424941cc9635e..7523252b3d2dc 100644 --- a/src/frontend/planner_test/tests/testdata/over_window_function.yaml +++ b/src/frontend/planner_test/tests/testdata/over_window_function.yaml @@ -4,7 +4,7 @@ select lag(x) over() from t; logical_plan: | LogicalProject { exprs: [lag] } - └─LogicalOverAgg { window_functions: [lag() OVER(ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)] } + └─LogicalOverAgg { window_functions: [lag(t.x) OVER(ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)] } └─LogicalScan { table: t, columns: [t.x, t._row_id] } batch_error: |- Feature is not yet implemented: OverAgg to batch @@ -25,7 +25,7 @@ select lead(x, 2) over() from t; logical_plan: | LogicalProject { exprs: [lead] } - └─LogicalOverAgg { window_functions: [lead() OVER(ROWS BETWEEN CURRENT ROW AND 2 FOLLOWING)] } + └─LogicalOverAgg { window_functions: [lead(t.x) OVER(ROWS BETWEEN CURRENT ROW AND 2 FOLLOWING)] } └─LogicalScan { table: t, columns: [t.x, t._row_id] } batch_error: |- Feature is not yet implemented: OverAgg to batch @@ -53,7 +53,7 @@ select sum(x) over() from t; logical_plan: | LogicalProject { exprs: [sum] } - └─LogicalOverAgg { window_functions: [sum() OVER(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)] } + └─LogicalOverAgg { window_functions: [sum(t.x) OVER(ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)] } └─LogicalScan { table: t, columns: [t.x, t._row_id] } batch_error: |- Feature is not yet implemented: OverAgg to batch @@ -66,14 +66,14 @@ select x, y, min(x) over(PARTITION BY y ROWS 10 PRECEDING) from t; logical_plan: | LogicalProject { exprs: [t.x, t.y, min] } - └─LogicalOverAgg { window_functions: [min() OVER(PARTITION BY t.y ROWS BETWEEN 10 PRECEDING AND CURRENT ROW)] } + └─LogicalOverAgg { window_functions: [min(t.x) OVER(PARTITION BY t.y ROWS BETWEEN 10 PRECEDING AND CURRENT ROW)] } └─LogicalScan { table: t, columns: [t.x, t.y, t._row_id] } - sql: | create table t(x int, y int); select x, y, min(x) over(PARTITION BY y ROWS BETWEEN 1 PRECEDING AND 2 FOLLOWING) from t; logical_plan: | LogicalProject { exprs: [t.x, t.y, min] } - └─LogicalOverAgg { window_functions: [min() OVER(PARTITION BY t.y ROWS BETWEEN 1 PRECEDING AND 2 FOLLOWING)] } + └─LogicalOverAgg { window_functions: [min(t.x) OVER(PARTITION BY t.y ROWS BETWEEN 1 PRECEDING AND 2 FOLLOWING)] } └─LogicalScan { table: t, columns: [t.x, t.y, t._row_id] } batch_error: |- Feature is not yet implemented: OverAgg to batch @@ -94,7 +94,7 @@ select x, y, lag(x) over(PARTITION BY y ORDER BY x) from t; logical_plan: | LogicalProject { exprs: [t.x, t.y, lag] } - └─LogicalOverAgg { window_functions: [lag() OVER(PARTITION BY t.y ORDER BY t.x ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)] } + └─LogicalOverAgg { window_functions: [lag(t.x) OVER(PARTITION BY t.y ORDER BY t.x ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)] } └─LogicalScan { table: t, columns: [t.x, t.y, t._row_id] } batch_error: |- Feature is not yet implemented: OverAgg to batch