Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(emit-on-window-close): enable queries with EMIT ON WINDOW CLOSE keyword #9622

Merged
merged 11 commits into from
May 6, 2023
5 changes: 5 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ where
always!(node.state_table, "EowcOverWindow");
}

// Sort
NodeBody::Sort(node) => {
always!(node.state_table, "Sort");
}

// Note: add internal tables for new nodes here.
_ => {}
}
Expand Down
99 changes: 84 additions & 15 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use risingwave_frontend::{
build_graph, explain_stream_graph, Binder, Explain, FrontendOpts, OptimizerContext,
OptimizerContextRef, PlanRef, Planner, WithOptions,
};
use risingwave_sqlparser::ast::{ExplainOptions, ObjectName, Statement};
use risingwave_sqlparser::ast::{EmitMode, ExplainOptions, ObjectName, Statement};
use risingwave_sqlparser::parser::Parser;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -93,6 +93,12 @@ pub struct TestCase {
/// Create MV fragments plan
pub stream_dist_plan: Option<String>,

/// Create MV plan with EOWC semantics `.gen_create_mv_plan(.., EmitMode::OnWindowClose)`
pub eowc_stream_plan: Option<String>,

/// Create MV fragments plan with EOWC semantics
pub eowc_stream_dist_plan: Option<String>,

// TODO: uncomment for Proto JSON of generated stream plan
// was: "stream_plan_proto": Option<String>
// pub plan_graph_proto: Option<String>,
Expand All @@ -114,6 +120,9 @@ pub struct TestCase {
/// Error of `.gen_stream_plan()`
pub stream_error: Option<String>,

/// Error of `.gen_stream_plan()` with `emit_on_window_close = true`
pub eowc_stream_error: Option<String>,

/// Support using file content or file location to create source.
pub create_source: Option<CreateConnector>,

Expand Down Expand Up @@ -165,6 +174,12 @@ pub struct TestCaseResult {
/// Create MV fragments plan
pub stream_dist_plan: Option<String>,

/// Create MV plan with EOWC semantics `.gen_create_mv_plan(.., EmitMode::OnWindowClose)`
pub eowc_stream_plan: Option<String>,

/// Create MV fragments plan with EOWC semantics
pub eowc_stream_dist_plan: Option<String>,

/// Error of binder
pub binder_error: Option<String>,

Expand All @@ -183,6 +198,9 @@ pub struct TestCaseResult {
/// Error of `.gen_stream_plan()`
pub stream_error: Option<String>,

/// Error of `.gen_stream_plan()` with `emit_on_window_close = true`
pub eowc_stream_error: Option<String>,

/// Error of `.gen_sink_plan()`
pub sink_error: Option<String>,

Expand Down Expand Up @@ -220,18 +238,21 @@ impl TestCaseResult {
batch_plan: self.batch_plan,
batch_local_plan: self.batch_local_plan,
stream_plan: self.stream_plan,
stream_dist_plan: self.stream_dist_plan,
eowc_stream_plan: self.eowc_stream_plan,
eowc_stream_dist_plan: self.eowc_stream_dist_plan,
sink_plan: self.sink_plan,
batch_plan_proto: self.batch_plan_proto,
planner_error: self.planner_error,
optimizer_error: self.optimizer_error,
batch_error: self.batch_error,
batch_local_error: self.batch_local_error,
stream_error: self.stream_error,
eowc_stream_error: self.eowc_stream_error,
binder_error: self.binder_error,
create_source: original_test_case.create_source.clone(),
create_table_with_connector: original_test_case.create_table_with_connector.clone(),
with_config_map: original_test_case.with_config_map.clone(),
stream_dist_plan: self.stream_dist_plan,
};
Ok(case)
}
Expand Down Expand Up @@ -437,9 +458,11 @@ impl TestCase {
name,
query,
columns,
emit_mode,
..
} => {
create_mv::handle_create_mv(handler_args, name, *query, columns).await?;
create_mv::handle_create_mv(handler_args, name, *query, columns, emit_mode)
.await?;
}
Statement::CreateView {
materialized: false,
Expand Down Expand Up @@ -624,11 +647,40 @@ impl TestCase {
}
}

'stream: {
if self.stream_plan.is_some()
|| self.stream_error.is_some()
|| self.stream_dist_plan.is_some()
{
{
// stream
for (
emit_mode,
plan_str,
ret_plan_str,
dist_plan_str,
ret_dist_plan_str,
error_str,
ret_error_str,
) in [
(
EmitMode::Immediately,
self.stream_plan.as_ref(),
&mut ret.stream_plan,
self.stream_dist_plan.as_ref(),
&mut ret.stream_dist_plan,
self.stream_error.as_ref(),
&mut ret.stream_error,
),
(
EmitMode::OnWindowClose,
self.eowc_stream_plan.as_ref(),
&mut ret.eowc_stream_plan,
self.eowc_stream_dist_plan.as_ref(),
&mut ret.eowc_stream_dist_plan,
self.eowc_stream_error.as_ref(),
&mut ret.eowc_stream_error,
),
] {
if plan_str.is_none() && dist_plan_str.is_none() && error_str.is_none() {
continue;
}

let q = if let Statement::Query(q) = stmt {
q.as_ref().clone()
} else {
Expand All @@ -637,27 +689,28 @@ impl TestCase {

let stream_plan = match create_mv::gen_create_mv_plan(
&session,
context,
context.clone(),
q,
ObjectName(vec!["test".into()]),
vec![],
Some(emit_mode),
) {
Ok((stream_plan, _)) => stream_plan,
Err(err) => {
ret.stream_error = Some(err.to_string());
break 'stream;
*ret_error_str = Some(err.to_string());
continue;
}
};

// Only generate stream_plan if it is specified in test case
if self.stream_plan.is_some() {
ret.stream_plan = Some(explain_plan(&stream_plan));
if plan_str.is_some() {
*ret_plan_str = Some(explain_plan(&stream_plan));
}

// Only generate stream_dist_plan if it is specified in test case
if self.stream_dist_plan.is_some() {
if dist_plan_str.is_some() {
let graph = build_graph(stream_plan);
ret.stream_dist_plan = Some(explain_stream_graph(&graph, false));
*ret_dist_plan_str = Some(explain_stream_graph(&graph, false));
}
}
}
Expand Down Expand Up @@ -708,6 +761,12 @@ fn check_result(expected: &TestCase, actual: &TestCaseResult) -> Result<()> {
&expected.batch_local_error,
&actual.batch_local_error,
)?;
check_err("stream", &expected.stream_error, &actual.stream_error)?;
check_err(
"eowc_stream",
&expected.eowc_stream_error,
&actual.eowc_stream_error,
)?;
check_option_plan_eq("logical_plan", &expected.logical_plan, &actual.logical_plan)?;
check_option_plan_eq(
"optimized_logical_plan_for_batch",
Expand All @@ -731,6 +790,16 @@ fn check_result(expected: &TestCase, actual: &TestCaseResult) -> Result<()> {
&expected.stream_dist_plan,
&actual.stream_dist_plan,
)?;
check_option_plan_eq(
"eowc_stream_plan",
&expected.eowc_stream_plan,
&actual.eowc_stream_plan,
)?;
check_option_plan_eq(
"eowc_stream_dist_plan",
&expected.eowc_stream_dist_plan,
&actual.eowc_stream_dist_plan,
)?;
check_option_plan_eq(
"batch_plan_proto",
&expected.batch_plan_proto,
Expand Down
130 changes: 130 additions & 0 deletions src/frontend/planner_test/tests/testdata/emit_on_window_close.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- sql: |
create table t (v1 int, v2 int, v3 int);
select v1, min(v2), count(distinct v3) as agg from t group by v1;
stream_plan: |
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [t.v1, min(t.v2), count(distinct t.v3)] }
└─StreamHashAgg { group_key: [t.v1], aggs: [min(t.v2), count(distinct t.v3), count] }
└─StreamExchange { dist: HashShard(t.v1) }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
eowc_stream_error: |-
Not supported: The query cannot be executed in Emit-On-Window-Close mode.
HINT: Try define a watermark column in the source, or avoid aggregation without GROUP BY
- sql: |
create source t (v1 int, v2 int, v3 int, watermark for v1 as v1 - 10) with (connector = 'kinesis') ROW FORMAT JSON;
select v1, min(v2), count(distinct v3) as agg from t group by v1;
stream_plan: |
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: "NoCheck", watermark_columns: [v1] }
└─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] }
└─StreamAppendOnlyHashAgg { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] }
└─StreamExchange { dist: HashShard(v1) }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - 10:Int32)] }
└─StreamSource { source: "t", columns: ["v1", "v2", "v3", "_row_id"] }
eowc_stream_plan: |
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: "NoCheck", watermark_columns: [v1] }
└─StreamSort { sort_column_index: 0 }
└─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] }
└─StreamAppendOnlyHashAgg { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] }
└─StreamExchange { dist: HashShard(v1) }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - 10:Int32)] }
└─StreamSource { source: "t", columns: ["v1", "v2", "v3", "_row_id"] }
eowc_stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: "NoCheck", watermark_columns: [v1] }
├── materialized table: 4294967294
└── StreamSort { sort_column_index: 0 }
└── StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] }
└── StreamAppendOnlyHashAgg { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] }
├── result table: 1
├── state tables: []
├── distinct tables: [ (distinct key: v3, table id: 2) ]
└── StreamExchange Hash([0]) from 1

Fragment 1
StreamRowIdGen { row_id_index: 3 }
└── StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - 10:Int32)] }
└── StreamSource { source: "t", columns: ["v1", "v2", "v3", "_row_id"] } { source state table: 4 }

Table 1
├── columns: [ v1, min(v2), count(distinct v3), count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 2
├── columns: [ v1, v3, count_for_agg_call_1 ]
├── primary key: [ $0 ASC, $1 ASC ]
├── value indices: [ 2 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 2

Table 4
├── columns: [ partition_id, offset_info ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0, 1 ]
├── distribution key: []
└── read pk prefix len hint: 1

Table 4294967294
├── columns: [ v1, min, agg ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

- sql: |
CREATE TABLE t (a TIMESTAMP, b INT, WATERMARK FOR a AS a - INTERVAL '5 minutes') APPEND ONLY;
SELECT
window_start, max(b)
FROM tumble(t, a, INTERVAL '1 hour')
GROUP BY window_start;
stream_plan: |
StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: "NoCheck", watermark_columns: [window_start] }
└─StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] }
└─StreamAppendOnlyHashAgg { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] }
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
eowc_stream_plan: |
StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: "NoCheck", watermark_columns: [window_start] }
└─StreamSort { sort_column_index: 0 }
└─StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] }
└─StreamAppendOnlyHashAgg { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] }
└─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
eowc_stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: "NoCheck", watermark_columns: [window_start] }
├── materialized table: 4294967294
└── StreamSort { sort_column_index: 0 }
└── StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] }
└── StreamAppendOnlyHashAgg { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] }
├── result table: 1
├── state tables: []
├── distinct tables: []
└── StreamExchange Hash([0]) from 1

Fragment 1
StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] }
└── Chain { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
├── Upstream
└── BatchPlanNode

Table 1 { columns: [ $expr1, max(t_b), count ], primary key: [ $0 ASC ], value indices: [ 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }

Table 4294967294 { columns: [ window_start, max ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }

- sql: |
create source t (a int, b int, tm timestamp, watermark for tm as tm - interval '5 minutes') with (connector = 'kinesis') ROW FORMAT JSON;
select lag(a, 2) over (partition by b order by tm) from t;
stream_error: |-
Feature is not yet implemented: OverAgg to stream
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/9124
eowc_stream_error: |-
Feature is not yet implemented: OverAgg to stream
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/9124
13 changes: 10 additions & 3 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::catalog::PbTable;
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::user::grant_privilege::Action;
use risingwave_sqlparser::ast::{Ident, ObjectName, Query};
use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};

use super::privilege::resolve_relation_privileges;
use super::RwPgResponse;
Expand Down Expand Up @@ -79,6 +79,7 @@ pub fn gen_create_mv_plan(
query: Query,
name: ObjectName,
columns: Vec<Ident>,
emit_mode: Option<EmitMode>,
st1page marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<(PlanRef, PbTable)> {
let db_name = session.database();
let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
Expand All @@ -102,7 +103,11 @@ pub fn gen_create_mv_plan(
if let Some(col_names) = col_names {
plan_root.set_out_names(col_names)?;
}
let materialize = plan_root.gen_materialize_plan(table_name, definition)?;
let materialize = plan_root.gen_materialize_plan(
table_name,
definition,
emit_mode == Some(EmitMode::OnWindowClose),
)?;
let mut table = materialize.table().to_prost(schema_id, database_id);
if session.config().get_create_compaction_group_for_mv() {
table.properties.insert(
Expand Down Expand Up @@ -137,6 +142,7 @@ pub async fn handle_create_mv(
name: ObjectName,
query: Query,
columns: Vec<Ident>,
emit_mode: Option<EmitMode>,
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();

Expand All @@ -146,7 +152,8 @@ pub async fn handle_create_mv(

let (table, graph, mut notices) = {
let context = OptimizerContext::from_handler_args(handler_args);
let (plan, table) = gen_create_mv_plan(&session, context.into(), query, name, columns)?;
let (plan, table) =
gen_create_mv_plan(&session, context.into(), query, name, columns, emit_mode)?;
let context = plan.plan_base().ctx.clone();
let mut graph = build_graph(plan);
graph.parallelism = session
Expand Down
Loading