Skip to content

Commit

Permalink
add eowc planner tests
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed May 6, 2023
1 parent bc2746c commit a69c1a3
Showing 1 changed file with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions src/frontend/planner_test/tests/testdata/emit_on_window_close.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- sql: |
create table t (v1 int, v2 int, v3 int);
select v1, min(v2), count(distinct v3) as agg from t group by v1;
stream_plan: |
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [t.v1, min(t.v2), count(distinct t.v3)] }
└─StreamHashAgg { group_key: [t.v1], aggs: [min(t.v2), count(distinct t.v3), count] }
└─StreamExchange { dist: HashShard(t.v1) }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
eowc_stream_error: |-
Not supported: The query cannot be executed in Emit-On-Window-Close mode.
HINT: Try define a watermark column in the source, or avoid aggregation without GROUP BY
- sql: |
create source t (v1 int, v2 int, v3 int, watermark for v1 as v1 - 10) with (connector = 'kinesis') ROW FORMAT JSON;
select v1, min(v2), count(distinct v3) as agg from t group by v1;
stream_plan: |
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: "NoCheck", watermark_columns: [v1] }
└─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] }
└─StreamAppendOnlyHashAgg { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] }
└─StreamExchange { dist: HashShard(v1) }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - 10:Int32)] }
└─StreamSource { source: "t", columns: ["v1", "v2", "v3", "_row_id"] }
eowc_stream_plan: |
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: "NoCheck", watermark_columns: [v1] }
└─StreamSort { sort_column_index: 0 }
└─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] }
└─StreamAppendOnlyHashAgg { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] }
└─StreamExchange { dist: HashShard(v1) }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - 10:Int32)] }
└─StreamSource { source: "t", columns: ["v1", "v2", "v3", "_row_id"] }
eowc_stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: "NoCheck", watermark_columns: [v1] }
├── materialized table: 4294967294
└── StreamSort { sort_column_index: 0 }
└── StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] }
└── StreamAppendOnlyHashAgg { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] }
├── result table: 1
├── state tables: []
├── distinct tables: [ (distinct key: v3, table id: 2) ]
└── StreamExchange Hash([0]) from 1
Fragment 1
StreamRowIdGen { row_id_index: 3 }
└── StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - 10:Int32)] }
└── StreamSource { source: "t", columns: ["v1", "v2", "v3", "_row_id"] } { source state table: 4 }
Table 1
├── columns: [ v1, min(v2), count(distinct v3), count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 2
├── columns: [ v1, v3, count_for_agg_call_1 ]
├── primary key: [ $0 ASC, $1 ASC ]
├── value indices: [ 2 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 2
Table 4
├── columns: [ partition_id, offset_info ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0, 1 ]
├── distribution key: []
└── read pk prefix len hint: 1
Table 4294967294
├── columns: [ v1, min, agg ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
- sql: |
CREATE TABLE t (a TIMESTAMP, b INT, WATERMARK FOR a AS a - INTERVAL '5 minutes') APPEND ONLY;
SELECT
window_start, max(b)
FROM tumble(t, a, INTERVAL '1 hour')
GROUP BY window_start;
stream_plan: |
StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: "NoCheck", watermark_columns: [window_start] }
└─StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] }
└─StreamAppendOnlyHashAgg { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] }
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
eowc_stream_plan: |
StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: "NoCheck", watermark_columns: [window_start] }
└─StreamSort { sort_column_index: 0 }
└─StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] }
└─StreamAppendOnlyHashAgg { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] }
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
eowc_stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: "NoCheck", watermark_columns: [window_start] }
├── materialized table: 4294967294
└── StreamSort { sort_column_index: 0 }
└── StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] }
└── StreamAppendOnlyHashAgg { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] }
├── result table: 1
├── state tables: []
├── distinct tables: []
└── StreamExchange Hash([0]) from 1
Fragment 1
StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] }
└── Chain { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
├── Upstream
└── BatchPlanNode
Table 1 { columns: [ $expr1, max(t_b), count ], primary key: [ $0 ASC ], value indices: [ 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 4294967294 { columns: [ window_start, max ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
- sql: |
create source t (a int, b int, tm timestamp, watermark for tm as tm - interval '5 minutes') with (connector = 'kinesis') ROW FORMAT JSON;
select lag(a, 2) over (partition by b order by tm) from t;
stream_error: |-
Feature is not yet implemented: OverAgg to stream
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/9124
eowc_stream_error: |-
Feature is not yet implemented: OverAgg to stream
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/9124

0 comments on commit a69c1a3

Please sign in to comment.