diff --git a/e2e_test/streaming/eowc/eowc_group_agg.slt b/e2e_test/streaming/eowc/eowc_group_agg.slt new file mode 100644 index 0000000000000..84d2228919b56 --- /dev/null +++ b/e2e_test/streaming/eowc/eowc_group_agg.slt @@ -0,0 +1,53 @@ +statement ok +set RW_IMPLICIT_FLUSH to true; + +statement ok +set streaming_parallelism = 1; + +statement ok +create table t ( + tm timestamp, + foo int, + watermark for tm as tm - interval '5 minutes' +) append only; + +statement ok +set streaming_parallelism = 0; + +statement ok +create materialized view mv +emit on window close +as +select + window_start, max(foo) +from tumble(t, tm, interval '1 hour') +group by window_start; + +statement ok +insert into t values +('2023-05-06 16:51:00', 1), +('2023-05-06 16:56:00', 8), +('2023-05-06 17:30:00', 3), +('2023-05-06 17:59:00', 4), +('2023-05-06 18:01:00', 6); + +query TI +select * from mv order by window_start; +---- +2023-05-06 16:00:00 8 + +statement ok +insert into t values +('2023-05-06 18:10:00', 7); + +query TI +select * from mv order by window_start; +---- +2023-05-06 16:00:00 8 +2023-05-06 17:00:00 4 + +statement ok +drop materialized view mv; + +statement ok +drop table t; diff --git a/e2e_test/streaming/eowc/eowc_group_top_n.slt b/e2e_test/streaming/eowc/eowc_group_top_n.slt new file mode 100644 index 0000000000000..707007c58a3b5 --- /dev/null +++ b/e2e_test/streaming/eowc/eowc_group_top_n.slt @@ -0,0 +1,56 @@ +statement ok +set RW_IMPLICIT_FLUSH to true; + +statement ok +set streaming_parallelism = 1; + +statement ok +create table t ( + tm timestamp, + foo int, + watermark for tm as tm - interval '5 minutes' +) append only; + +statement ok +set streaming_parallelism = 0; + +statement ok +create materialized view mv +emit on window close +as +select window_start, foo +from ( + select + *, row_number() over (partition by window_start order by foo) as rownum + from tumble(t, tm, interval '1 hour') +) +where rownum <= 1; + +statement ok +insert into t values +('2023-05-06 16:51:00', 1), +('2023-05-06 16:56:00', 8), +('2023-05-06 17:30:00', 3), +('2023-05-06 17:59:00', 4), +('2023-05-06 18:01:00', 6); + +query TI +select * from mv order by window_start; +---- +2023-05-06 16:00:00 1 + +statement ok +insert into t values +('2023-05-06 18:10:00', 7); + +query TI +select * from mv order by window_start; +---- +2023-05-06 16:00:00 1 +2023-05-06 17:00:00 3 + +statement ok +drop materialized view mv; + +statement ok +drop table t; diff --git a/src/common/src/util/stream_graph_visitor.rs b/src/common/src/util/stream_graph_visitor.rs index 9666e3c28a929..cd702c8e51c74 100644 --- a/src/common/src/util/stream_graph_visitor.rs +++ b/src/common/src/util/stream_graph_visitor.rs @@ -170,6 +170,11 @@ where always!(node.state_table, "EowcOverWindow"); } + // Sort + NodeBody::Sort(node) => { + always!(node.state_table, "Sort"); + } + // Note: add internal tables for new nodes here. _ => {} } diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index a8718a102f070..71926007dce18 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -35,7 +35,7 @@ use risingwave_frontend::{ build_graph, explain_stream_graph, Binder, Explain, FrontendOpts, OptimizerContext, OptimizerContextRef, PlanRef, Planner, WithOptions, }; -use risingwave_sqlparser::ast::{ExplainOptions, ObjectName, Statement}; +use risingwave_sqlparser::ast::{EmitMode, ExplainOptions, ObjectName, Statement}; use risingwave_sqlparser::parser::Parser; use serde::{Deserialize, Serialize}; @@ -93,6 +93,12 @@ pub struct TestCase { /// Create MV fragments plan pub stream_dist_plan: Option, + /// Create MV plan with EOWC semantics `.gen_create_mv_plan(.., EmitMode::OnWindowClose)` + pub eowc_stream_plan: Option, + + /// Create MV fragments plan with EOWC semantics + pub eowc_stream_dist_plan: Option, + // TODO: uncomment for Proto JSON of generated stream plan // was: "stream_plan_proto": Option // pub plan_graph_proto: Option, @@ -114,6 +120,9 @@ pub struct TestCase { /// Error of `.gen_stream_plan()` pub stream_error: Option, + /// Error of `.gen_stream_plan()` with `emit_on_window_close = true` + pub eowc_stream_error: Option, + /// Support using file content or file location to create source. pub create_source: Option, @@ -165,6 +174,12 @@ pub struct TestCaseResult { /// Create MV fragments plan pub stream_dist_plan: Option, + /// Create MV plan with EOWC semantics `.gen_create_mv_plan(.., EmitMode::OnWindowClose)` + pub eowc_stream_plan: Option, + + /// Create MV fragments plan with EOWC semantics + pub eowc_stream_dist_plan: Option, + /// Error of binder pub binder_error: Option, @@ -183,6 +198,9 @@ pub struct TestCaseResult { /// Error of `.gen_stream_plan()` pub stream_error: Option, + /// Error of `.gen_stream_plan()` with `emit_on_window_close = true` + pub eowc_stream_error: Option, + /// Error of `.gen_sink_plan()` pub sink_error: Option, @@ -220,6 +238,9 @@ impl TestCaseResult { batch_plan: self.batch_plan, batch_local_plan: self.batch_local_plan, stream_plan: self.stream_plan, + stream_dist_plan: self.stream_dist_plan, + eowc_stream_plan: self.eowc_stream_plan, + eowc_stream_dist_plan: self.eowc_stream_dist_plan, sink_plan: self.sink_plan, batch_plan_proto: self.batch_plan_proto, planner_error: self.planner_error, @@ -227,11 +248,11 @@ impl TestCaseResult { batch_error: self.batch_error, batch_local_error: self.batch_local_error, stream_error: self.stream_error, + eowc_stream_error: self.eowc_stream_error, binder_error: self.binder_error, create_source: original_test_case.create_source.clone(), create_table_with_connector: original_test_case.create_table_with_connector.clone(), with_config_map: original_test_case.with_config_map.clone(), - stream_dist_plan: self.stream_dist_plan, }; Ok(case) } @@ -437,9 +458,11 @@ impl TestCase { name, query, columns, + emit_mode, .. } => { - create_mv::handle_create_mv(handler_args, name, *query, columns).await?; + create_mv::handle_create_mv(handler_args, name, *query, columns, emit_mode) + .await?; } Statement::CreateView { materialized: false, @@ -624,11 +647,40 @@ impl TestCase { } } - 'stream: { - if self.stream_plan.is_some() - || self.stream_error.is_some() - || self.stream_dist_plan.is_some() - { + { + // stream + for ( + emit_mode, + plan_str, + ret_plan_str, + dist_plan_str, + ret_dist_plan_str, + error_str, + ret_error_str, + ) in [ + ( + EmitMode::Immediately, + self.stream_plan.as_ref(), + &mut ret.stream_plan, + self.stream_dist_plan.as_ref(), + &mut ret.stream_dist_plan, + self.stream_error.as_ref(), + &mut ret.stream_error, + ), + ( + EmitMode::OnWindowClose, + self.eowc_stream_plan.as_ref(), + &mut ret.eowc_stream_plan, + self.eowc_stream_dist_plan.as_ref(), + &mut ret.eowc_stream_dist_plan, + self.eowc_stream_error.as_ref(), + &mut ret.eowc_stream_error, + ), + ] { + if plan_str.is_none() && dist_plan_str.is_none() && error_str.is_none() { + continue; + } + let q = if let Statement::Query(q) = stmt { q.as_ref().clone() } else { @@ -637,27 +689,28 @@ impl TestCase { let stream_plan = match create_mv::gen_create_mv_plan( &session, - context, + context.clone(), q, ObjectName(vec!["test".into()]), vec![], + Some(emit_mode), ) { Ok((stream_plan, _)) => stream_plan, Err(err) => { - ret.stream_error = Some(err.to_string()); - break 'stream; + *ret_error_str = Some(err.to_string()); + continue; } }; // Only generate stream_plan if it is specified in test case - if self.stream_plan.is_some() { - ret.stream_plan = Some(explain_plan(&stream_plan)); + if plan_str.is_some() { + *ret_plan_str = Some(explain_plan(&stream_plan)); } // Only generate stream_dist_plan if it is specified in test case - if self.stream_dist_plan.is_some() { + if dist_plan_str.is_some() { let graph = build_graph(stream_plan); - ret.stream_dist_plan = Some(explain_stream_graph(&graph, false)); + *ret_dist_plan_str = Some(explain_stream_graph(&graph, false)); } } } @@ -708,6 +761,12 @@ fn check_result(expected: &TestCase, actual: &TestCaseResult) -> Result<()> { &expected.batch_local_error, &actual.batch_local_error, )?; + check_err("stream", &expected.stream_error, &actual.stream_error)?; + check_err( + "eowc_stream", + &expected.eowc_stream_error, + &actual.eowc_stream_error, + )?; check_option_plan_eq("logical_plan", &expected.logical_plan, &actual.logical_plan)?; check_option_plan_eq( "optimized_logical_plan_for_batch", @@ -731,6 +790,16 @@ fn check_result(expected: &TestCase, actual: &TestCaseResult) -> Result<()> { &expected.stream_dist_plan, &actual.stream_dist_plan, )?; + check_option_plan_eq( + "eowc_stream_plan", + &expected.eowc_stream_plan, + &actual.eowc_stream_plan, + )?; + check_option_plan_eq( + "eowc_stream_dist_plan", + &expected.eowc_stream_dist_plan, + &actual.eowc_stream_dist_plan, + )?; check_option_plan_eq( "batch_plan_proto", &expected.batch_plan_proto, diff --git a/src/frontend/planner_test/tests/testdata/emit_on_window_close.yaml b/src/frontend/planner_test/tests/testdata/emit_on_window_close.yaml new file mode 100644 index 0000000000000..d2b0b802427a5 --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/emit_on_window_close.yaml @@ -0,0 +1,130 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- sql: | + create table t (v1 int, v2 int, v3 int); + select v1, min(v2), count(distinct v3) as agg from t group by v1; + stream_plan: | + StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: "NoCheck" } + └─StreamProject { exprs: [t.v1, min(t.v2), count(distinct t.v3)] } + └─StreamHashAgg { group_key: [t.v1], aggs: [min(t.v2), count(distinct t.v3), count] } + └─StreamExchange { dist: HashShard(t.v1) } + └─StreamTableScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + eowc_stream_error: |- + Not supported: The query cannot be executed in Emit-On-Window-Close mode. + HINT: Try define a watermark column in the source, or avoid aggregation without GROUP BY +- sql: | + create source t (v1 int, v2 int, v3 int, watermark for v1 as v1 - 10) with (connector = 'kinesis') ROW FORMAT JSON; + select v1, min(v2), count(distinct v3) as agg from t group by v1; + stream_plan: | + StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: "NoCheck", watermark_columns: [v1] } + └─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] } + └─StreamAppendOnlyHashAgg { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] } + └─StreamExchange { dist: HashShard(v1) } + └─StreamRowIdGen { row_id_index: 3 } + └─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - 10:Int32)] } + └─StreamSource { source: "t", columns: ["v1", "v2", "v3", "_row_id"] } + eowc_stream_plan: | + StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: "NoCheck", watermark_columns: [v1] } + └─StreamSort { sort_column_index: 0 } + └─StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] } + └─StreamAppendOnlyHashAgg { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] } + └─StreamExchange { dist: HashShard(v1) } + └─StreamRowIdGen { row_id_index: 3 } + └─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - 10:Int32)] } + └─StreamSource { source: "t", columns: ["v1", "v2", "v3", "_row_id"] } + eowc_stream_dist_plan: |+ + Fragment 0 + StreamMaterialize { columns: [v1, min, agg], stream_key: [v1], pk_columns: [v1], pk_conflict: "NoCheck", watermark_columns: [v1] } + ├── materialized table: 4294967294 + └── StreamSort { sort_column_index: 0 } + └── StreamProject { exprs: [v1, min(v2), count(distinct v3)], output_watermarks: [v1] } + └── StreamAppendOnlyHashAgg { group_key: [v1], aggs: [min(v2), count(distinct v3), count], output_watermarks: [v1] } + ├── result table: 1 + ├── state tables: [] + ├── distinct tables: [ (distinct key: v3, table id: 2) ] + └── StreamExchange Hash([0]) from 1 + + Fragment 1 + StreamRowIdGen { row_id_index: 3 } + └── StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - 10:Int32)] } + └── StreamSource { source: "t", columns: ["v1", "v2", "v3", "_row_id"] } { source state table: 4 } + + Table 1 + ├── columns: [ v1, min(v2), count(distinct v3), count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 + + Table 2 + ├── columns: [ v1, v3, count_for_agg_call_1 ] + ├── primary key: [ $0 ASC, $1 ASC ] + ├── value indices: [ 2 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 2 + + Table 4 + ├── columns: [ partition_id, offset_info ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 0, 1 ] + ├── distribution key: [] + └── read pk prefix len hint: 1 + + Table 4294967294 + ├── columns: [ v1, min, agg ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 0, 1, 2 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 + +- sql: | + CREATE TABLE t (a TIMESTAMP, b INT, WATERMARK FOR a AS a - INTERVAL '5 minutes') APPEND ONLY; + SELECT + window_start, max(b) + FROM tumble(t, a, INTERVAL '1 hour') + GROUP BY window_start; + stream_plan: | + StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: "NoCheck", watermark_columns: [window_start] } + └─StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] } + └─StreamAppendOnlyHashAgg { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] } + └─StreamExchange { dist: HashShard($expr1) } + └─StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] } + └─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + eowc_stream_plan: | + StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: "NoCheck", watermark_columns: [window_start] } + └─StreamSort { sort_column_index: 0 } + └─StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] } + └─StreamAppendOnlyHashAgg { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] } + └─StreamExchange { dist: HashShard($expr1) } + └─StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] } + └─StreamTableScan { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + eowc_stream_dist_plan: |+ + Fragment 0 + StreamMaterialize { columns: [window_start, max], stream_key: [window_start], pk_columns: [window_start], pk_conflict: "NoCheck", watermark_columns: [window_start] } + ├── materialized table: 4294967294 + └── StreamSort { sort_column_index: 0 } + └── StreamProject { exprs: [$expr1, max(t.b)], output_watermarks: [$expr1] } + └── StreamAppendOnlyHashAgg { group_key: [$expr1], aggs: [max(t.b), count], output_watermarks: [$expr1] } + ├── result table: 1 + ├── state tables: [] + ├── distinct tables: [] + └── StreamExchange Hash([0]) from 1 + + Fragment 1 + StreamProject { exprs: [TumbleStart(t.a, '01:00:00':Interval) as $expr1, t.b, t._row_id], output_watermarks: [$expr1] } + └── Chain { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + ├── Upstream + └── BatchPlanNode + + Table 1 { columns: [ $expr1, max(t_b), count ], primary key: [ $0 ASC ], value indices: [ 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + + Table 4294967294 { columns: [ window_start, max ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + +- sql: | + create source t (a int, b int, tm timestamp, watermark for tm as tm - interval '5 minutes') with (connector = 'kinesis') ROW FORMAT JSON; + select lag(a, 2) over (partition by b order by tm) from t; + stream_error: |- + Feature is not yet implemented: OverAgg to stream + Tracking issue: https://github.com/risingwavelabs/risingwave/issues/9124 + eowc_stream_error: |- + Feature is not yet implemented: OverAgg to stream + Tracking issue: https://github.com/risingwavelabs/risingwave/issues/9124 diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index a418e727a7670..397428d28d5aa 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -18,7 +18,7 @@ use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::catalog::PbTable; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::user::grant_privilege::Action; -use risingwave_sqlparser::ast::{Ident, ObjectName, Query}; +use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query}; use super::privilege::resolve_relation_privileges; use super::RwPgResponse; @@ -79,6 +79,7 @@ pub fn gen_create_mv_plan( query: Query, name: ObjectName, columns: Vec, + emit_mode: Option, ) -> Result<(PlanRef, PbTable)> { let db_name = session.database(); let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, name)?; @@ -98,11 +99,17 @@ pub fn gen_create_mv_plan( let col_names = get_column_names(&bound, session, columns)?; + let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose); + if emit_on_window_close { + context.warn("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution."); + } + let mut plan_root = Planner::new(context).plan_query(bound)?; if let Some(col_names) = col_names { plan_root.set_out_names(col_names)?; } - let materialize = plan_root.gen_materialize_plan(table_name, definition)?; + let materialize = + plan_root.gen_materialize_plan(table_name, definition, emit_on_window_close)?; let mut table = materialize.table().to_prost(schema_id, database_id); if session.config().get_create_compaction_group_for_mv() { table.properties.insert( @@ -137,6 +144,7 @@ pub async fn handle_create_mv( name: ObjectName, query: Query, columns: Vec, + emit_mode: Option, ) -> Result { let session = handler_args.session.clone(); @@ -146,7 +154,8 @@ pub async fn handle_create_mv( let (table, graph, mut notices) = { let context = OptimizerContext::from_handler_args(handler_args); - let (plan, table) = gen_create_mv_plan(&session, context.into(), query, name, columns)?; + let (plan, table) = + gen_create_mv_plan(&session, context.into(), query, name, columns, emit_mode)?; let context = plan.plan_base().ctx.clone(); let mut graph = build_graph(plan); graph.parallelism = session diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 53926fc53bab0..71b34e60e6bba 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -109,9 +109,17 @@ async fn do_handle_explain( query, name, columns, + emit_mode, .. - } => gen_create_mv_plan(&session, context.clone(), *query, name, columns) - .map(|x| x.0), + } => gen_create_mv_plan( + &session, + context.clone(), + *query, + name, + columns, + emit_mode, + ) + .map(|x| x.0), Statement::CreateSink { stmt } => { gen_sink_plan(&session, context.clone(), stmt).map(|x| x.0) diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 3d3c980dfa005..c76789f69d6bb 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -348,15 +348,8 @@ pub async fn handle( ) .into()); } - if emit_mode == Some(EmitMode::OnWindowClose) { - return Err(ErrorCode::NotImplemented( - "CREATE MATERIALIZED VIEW EMIT ON WINDOW CLOSE".to_string(), - None.into(), - ) - .into()); - } if materialized { - create_mv::handle_create_mv(handler_args, name, *query, columns).await + create_mv::handle_create_mv(handler_args, name, *query, columns, emit_mode).await } else { create_view::handle_create_view(handler_args, name, columns, *query).await } diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index c3cdd0024567b..cc47153bb147e 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -44,8 +44,9 @@ use risingwave_pb::catalog::WatermarkDesc; use self::heuristic_optimizer::ApplyOrder; use self::plan_node::{ - generic, BatchProject, Convention, LogicalProject, LogicalSource, StreamDml, StreamMaterialize, - StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter, + generic, stream_enforce_eowc_requirement, BatchProject, Convention, LogicalProject, + LogicalSource, StreamDml, StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, + StreamWatermarkFilter, ToStreamContext, }; use self::plan_visitor::has_batch_exchange; #[cfg(debug_assertions)] @@ -54,6 +55,7 @@ use self::property::RequiredDist; use self::rule::*; use crate::catalog::table_catalog::{TableType, TableVersion}; use crate::expr::InputRef; +use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::{ BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, }; @@ -267,11 +269,11 @@ impl PlanRoot { } /// Generate optimized stream plan - fn gen_optimized_stream_plan(&mut self) -> Result { + fn gen_optimized_stream_plan(&mut self, emit_on_window_close: bool) -> Result { let ctx = self.plan.ctx(); let _explain_trace = ctx.is_explain_trace(); - let mut plan = self.gen_stream_plan()?; + let mut plan = self.gen_stream_plan(emit_on_window_close)?; plan = plan.optimize_by_rules(&OptimizationStage::new( "Merge StreamProject", @@ -319,7 +321,7 @@ impl PlanRoot { } /// Generate create index or create materialize view plan. - fn gen_stream_plan(&mut self) -> Result { + fn gen_stream_plan(&mut self, emit_on_window_close: bool) -> Result { let ctx = self.plan.ctx(); let explain_trace = ctx.is_explain_trace(); @@ -368,7 +370,11 @@ impl PlanRoot { .rewrite_required_order(&self.required_order) .unwrap(); self.out_fields = out_col_change.rewrite_bitset(&self.out_fields); - plan.to_stream_with_dist_required(&self.required_dist, &mut Default::default()) + let plan = plan.to_stream_with_dist_required( + &self.required_dist, + &mut ToStreamContext::new(emit_on_window_close), + )?; + stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close) } _ => unreachable!(), }?; @@ -394,7 +400,7 @@ impl PlanRoot { watermark_descs: Vec, version: Option, ) -> Result { - let mut stream_plan = self.gen_optimized_stream_plan()?; + let mut stream_plan = self.gen_optimized_stream_plan(false)?; // Add DML node. stream_plan = StreamDml::new( @@ -472,8 +478,9 @@ impl PlanRoot { &mut self, mv_name: String, definition: String, + emit_on_window_close: bool, ) -> Result { - let stream_plan = self.gen_optimized_stream_plan()?; + let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close)?; StreamMaterialize::create( stream_plan, @@ -493,7 +500,7 @@ impl PlanRoot { index_name: String, definition: String, ) -> Result { - let stream_plan = self.gen_optimized_stream_plan()?; + let stream_plan = self.gen_optimized_stream_plan(false)?; StreamMaterialize::create( stream_plan, @@ -514,7 +521,7 @@ impl PlanRoot { definition: String, properties: WithOptions, ) -> Result { - let mut stream_plan = self.gen_optimized_stream_plan()?; + let mut stream_plan = self.gen_optimized_stream_plan(false)?; // Add a project node if there is hidden column(s). let input_fields = stream_plan.schema().fields(); diff --git a/src/frontend/src/optimizer/plan_node/convert.rs b/src/frontend/src/optimizer/plan_node/convert.rs index fca2bf20f2c78..f6399bc7cbba8 100644 --- a/src/frontend/src/optimizer/plan_node/convert.rs +++ b/src/frontend/src/optimizer/plan_node/convert.rs @@ -58,6 +58,38 @@ pub trait ToStream { } } +pub fn stream_enforce_eowc_requirement( + ctx: OptimizerContextRef, + plan: PlanRef, + emit_on_window_close: bool, +) -> Result { + if emit_on_window_close && !plan.emit_on_window_close() { + let watermark_cols = plan.watermark_columns(); + let n_watermark_cols = watermark_cols.count_ones(..); + if n_watermark_cols == 0 { + Err(ErrorCode::NotSupported( + "The query cannot be executed in Emit-On-Window-Close mode.".to_string(), + "Try define a watermark column in the source, or avoid aggregation without GROUP BY" + .to_string(), + ) + .into()) + } else { + if n_watermark_cols > 1 { + ctx.warn("There are multiple watermark columns in the query, currently only the first one will be used."); + } + let watermark_col_idx = watermark_cols.ones().next().unwrap(); + Ok(StreamSort::new(plan, watermark_col_idx).into()) + } + } else if !emit_on_window_close && plan.emit_on_window_close() { + Err(ErrorCode::InternalError( + "Some bad thing happened, the generated plan is not correct.".to_string(), + ) + .into()) + } else { + Ok(plan) + } +} + #[derive(Debug, Clone, Default)] pub struct RewriteStreamContext { share_rewrite_map: HashMap, @@ -84,12 +116,20 @@ impl RewriteStreamContext { } } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct ToStreamContext { share_to_stream_map: HashMap, + emit_on_window_close: bool, } impl ToStreamContext { + pub fn new(emit_on_window_close: bool) -> Self { + Self { + share_to_stream_map: HashMap::new(), + emit_on_window_close, + } + } + pub fn add_to_stream_result(&mut self, plan_node_id: PlanNodeId, plan_ref: PlanRef) { self.share_to_stream_map .try_insert(plan_node_id, plan_ref) @@ -99,6 +139,10 @@ impl ToStreamContext { pub fn get_to_stream_result(&self, plan_node_id: PlanNodeId) -> Option<&PlanRef> { self.share_to_stream_map.get(&plan_node_id) } + + pub fn emit_on_window_close(&self) -> bool { + self.emit_on_window_close + } } /// `ToBatch` allows to convert a logical plan node to batch physical node diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 2761a25e98d35..1084199d5e577 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -384,6 +384,10 @@ impl StreamPlanRef for PlanRef { fn append_only(&self) -> bool { self.plan_base().append_only } + + fn emit_on_window_close(&self) -> bool { + self.plan_base().emit_on_window_close + } } impl BatchPlanRef for PlanRef { diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs index cff8287dfda1c..00973918836e8 100644 --- a/src/frontend/src/optimizer/plan_node/plan_base.rs +++ b/src/frontend/src/optimizer/plan_node/plan_base.rs @@ -24,7 +24,7 @@ use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::property::{Distribution, FunctionalDependencySet, Order}; /// the common fields of all nodes, please make a field named `base` in -/// every planNode and correctly valued it when construct the planNode. +/// every planNode and correctly value it when construct the planNode. #[derive(Clone, Debug, Educe)] #[educe(PartialEq, Eq, Hash)] pub struct PlanBase { @@ -46,6 +46,8 @@ pub struct PlanBase { /// The append-only property of the PlanNode's output is a stream-only property. Append-only /// means the stream contains only insert operation. pub append_only: bool, + /// Whether the output is emitted on window close. + pub emit_on_window_close: bool, pub functional_dependency: FunctionalDependencySet, /// The watermark column indices of the PlanNode's output. There could be watermark output from /// this stream operator. @@ -78,6 +80,10 @@ impl stream::StreamPlanRef for PlanBase { fn append_only(&self) -> bool { self.append_only } + + fn emit_on_window_close(&self) -> bool { + self.emit_on_window_close + } } impl batch::BatchPlanRef for PlanBase { fn order(&self) -> &Order { @@ -102,6 +108,7 @@ impl PlanBase { order: Order::any(), // Logical plan node won't touch `append_only` field append_only: true, + emit_on_window_close: false, functional_dependency, watermark_columns, } @@ -120,6 +127,7 @@ impl PlanBase { logical: &impl GenericPlanNode, dist: Distribution, append_only: bool, + emit_on_window_close: bool, watermark_columns: FixedBitSet, ) -> Self { Self::new_stream( @@ -129,10 +137,12 @@ impl PlanBase { logical.functional_dependency(), dist, append_only, + emit_on_window_close, watermark_columns, ) } + #[expect(clippy::too_many_arguments)] pub fn new_stream( ctx: OptimizerContextRef, schema: Schema, @@ -140,6 +150,7 @@ impl PlanBase { functional_dependency: FunctionalDependencySet, dist: Distribution, append_only: bool, + emit_on_window_close: bool, watermark_columns: FixedBitSet, ) -> Self { let id = ctx.next_plan_node_id(); @@ -152,6 +163,7 @@ impl PlanBase { order: Order::any(), logical_pk, append_only, + emit_on_window_close, functional_dependency, watermark_columns, } @@ -183,6 +195,7 @@ impl PlanBase { logical_pk: vec![], // Batch plan node won't touch `append_only` field append_only: true, + emit_on_window_close: false, // TODO(rc): batch EOWC support? functional_dependency, watermark_columns, } @@ -196,6 +209,7 @@ impl PlanBase { plan_node.functional_dependency().clone(), plan_node.distribution().clone(), plan_node.append_only(), + plan_node.emit_on_window_close(), plan_node.watermark_columns().clone(), ) } @@ -232,6 +246,9 @@ macro_rules! impl_base_delegate { pub fn append_only(&self) -> bool { self.plan_base().append_only } + pub fn emit_on_window_close(&self) -> bool { + self.plan_base().emit_on_window_close + } pub fn functional_dependency(&self) -> &FunctionalDependencySet { &self.plan_base().functional_dependency } diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 8564aaf477398..25afdf4c94ce7 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -69,6 +69,7 @@ macro_rules! impl_node { pub trait StreamPlanNode: GenericPlanNode { fn distribution(&self) -> Distribution; fn append_only(&self) -> bool; + fn emit_on_window_close(&self) -> bool; fn to_stream_base(&self) -> PlanBase { let ctx = self.ctx(); PlanBase { @@ -78,6 +79,7 @@ pub trait StreamPlanNode: GenericPlanNode { logical_pk: self.logical_pk().unwrap_or_default(), dist: self.distribution(), append_only: self.append_only(), + emit_on_window_close: self.emit_on_window_close(), } } } @@ -85,6 +87,7 @@ pub trait StreamPlanNode: GenericPlanNode { pub trait StreamPlanRef: GenericPlanRef { fn distribution(&self) -> &Distribution; fn append_only(&self) -> bool; + fn emit_on_window_close(&self) -> bool; } impl generic::GenericPlanRef for PlanRef { @@ -131,6 +134,10 @@ impl StreamPlanRef for PlanBase { fn append_only(&self) -> bool { self.append_only } + + fn emit_on_window_close(&self) -> bool { + self.emit_on_window_close + } } impl StreamPlanRef for PlanRef { @@ -141,6 +148,10 @@ impl StreamPlanRef for PlanRef { fn append_only(&self) -> bool { self.0.append_only } + + fn emit_on_window_close(&self) -> bool { + self.0.emit_on_window_close + } } /// Implements [`generic::Join`] with delta join. It requires its two @@ -397,6 +408,7 @@ pub struct PlanBase { #[educe(Hash(ignore))] pub dist: Distribution, pub append_only: bool, + pub emit_on_window_close: bool, } impl_node!( diff --git a/src/frontend/src/optimizer/plan_node/stream_dedup.rs b/src/frontend/src/optimizer/plan_node/stream_dedup.rs index 8459568438e37..d9abd7b9af870 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dedup.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dedup.rs @@ -43,6 +43,7 @@ impl StreamDedup { &logical, input.distribution().clone(), true, + input.emit_on_window_close(), input.watermark_columns().clone(), ); StreamDedup { base, logical } diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index 3c2e09acbe6df..97038bf05dd5d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -67,8 +67,13 @@ impl StreamDeltaJoin { logical.i2o_col_mapping().rewrite_bitset(&watermark_columns) }; // TODO: derive from input - let base = - PlanBase::new_stream_with_logical(&logical, dist, append_only, watermark_columns); + let base = PlanBase::new_stream_with_logical( + &logical, + dist, + append_only, + false, // TODO(rc): derive EOWC property from input + watermark_columns, + ); Self { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_derive.rs b/src/frontend/src/optimizer/plan_node/stream_derive.rs index fdc092cfa951b..2e9cb2892a9cf 100644 --- a/src/frontend/src/optimizer/plan_node/stream_derive.rs +++ b/src/frontend/src/optimizer/plan_node/stream_derive.rs @@ -45,6 +45,10 @@ impl StreamPlanNode for DynamicFilter { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for Exchange { @@ -73,6 +77,10 @@ impl StreamPlanNode for Exchange { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for DeltaJoin { @@ -101,6 +109,10 @@ impl StreamPlanNode for DeltaJoin { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for Expand { @@ -129,6 +141,10 @@ impl StreamPlanNode for Expand { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for Filter { @@ -157,6 +173,10 @@ impl StreamPlanNode for Filter { fn append_only(&self) -> bool { self.core.input.append_only() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for GlobalSimpleAgg { @@ -185,6 +205,10 @@ impl StreamPlanNode for GlobalSimpleAgg { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for GroupTopN { @@ -213,6 +237,10 @@ impl StreamPlanNode for GroupTopN { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for HashAgg { @@ -241,6 +269,10 @@ impl StreamPlanNode for HashAgg { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for HashJoin { @@ -269,6 +301,10 @@ impl StreamPlanNode for HashJoin { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for HopWindow { @@ -297,6 +333,10 @@ impl StreamPlanNode for HopWindow { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for IndexScan { @@ -325,6 +365,10 @@ impl StreamPlanNode for IndexScan { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for LocalSimpleAgg { @@ -353,6 +397,10 @@ impl StreamPlanNode for LocalSimpleAgg { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for Materialize { @@ -381,6 +429,10 @@ impl StreamPlanNode for Materialize { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for ProjectSet { @@ -409,6 +461,10 @@ impl StreamPlanNode for ProjectSet { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for Project { @@ -439,6 +495,10 @@ impl StreamPlanNode for Project { fn append_only(&self) -> bool { self.core.input.append_only() } + + fn emit_on_window_close(&self) -> bool { + self.core.input.emit_on_window_close() + } } impl GenericPlanNode for Sink { @@ -467,6 +527,10 @@ impl StreamPlanNode for Sink { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for Source { @@ -495,6 +559,10 @@ impl StreamPlanNode for Source { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for TableScan { @@ -523,6 +591,10 @@ impl StreamPlanNode for TableScan { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } impl GenericPlanNode for TopN { @@ -551,4 +623,8 @@ impl StreamPlanNode for TopN { fn append_only(&self) -> bool { todo!() } + + fn emit_on_window_close(&self) -> bool { + todo!() + } } diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs index 1d0a62056aae6..2416b5bb0b8c1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dml.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs @@ -37,6 +37,7 @@ impl StreamDml { input.functional_dependency().clone(), input.distribution().clone(), append_only, + false, // TODO(rc): decide EOWC property FixedBitSet::with_capacity(input.schema().len()), // no watermark if dml is allowed ); diff --git a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs index 894a9190b9a70..9d82af57c39da 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs @@ -62,6 +62,7 @@ impl StreamDynamicFilter { left.distribution().clone(), false, /* we can have a new abstraction for append only and monotonically increasing * in the future */ + false, // TODO(rc): decide EOWC property watermark_columns, ); let core = generic::DynamicFilter { diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs index b4d0b142c0af1..35d8db3f275a9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs @@ -22,6 +22,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::generic::{self, PlanWindowFunction}; use super::utils::TableCatalogBuilder; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::TableCatalog; @@ -37,6 +38,7 @@ impl StreamEowcOverWindow { let input = &logical.input; assert!(input.append_only()); + assert!(input.emit_on_window_close()); // Should order by a single watermark column. let order_key = &logical.window_functions[0].order_by; @@ -54,6 +56,7 @@ impl StreamEowcOverWindow { &logical, input.distribution().clone(), true, + true, watermark_columns, ); StreamEowcOverWindow { base, logical } diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 702e0c7246c8d..bc618ae3d6f55 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -17,6 +17,7 @@ use std::fmt; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode}; +use super::stream::StreamPlanRef; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::{Distribution, DistributionDisplay}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -40,6 +41,7 @@ impl StreamExchange { input.functional_dependency().clone(), dist, input.append_only(), + input.emit_on_window_close(), input.watermark_columns().clone(), ); StreamExchange { @@ -60,6 +62,7 @@ impl StreamExchange { input.functional_dependency().clone(), input.distribution().clone(), input.append_only(), + input.emit_on_window_close(), input.watermark_columns().clone(), ); StreamExchange { diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs index 3f72cbac599e8..dbcec9fa93499 100644 --- a/src/frontend/src/optimizer/plan_node/stream_expand.rs +++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs @@ -19,6 +19,7 @@ use risingwave_pb::stream_plan::expand_node::Subset; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::ExpandNode; +use super::stream::StreamPlanRef; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -54,6 +55,7 @@ impl StreamExpand { &logical, dist, input.append_only(), + input.emit_on_window_close(), watermark_columns, ); StreamExpand { base, logical } diff --git a/src/frontend/src/optimizer/plan_node/stream_filter.rs b/src/frontend/src/optimizer/plan_node/stream_filter.rs index 92eb858c5c4fc..5581ebfe8d278 100644 --- a/src/frontend/src/optimizer/plan_node/stream_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_filter.rs @@ -17,6 +17,7 @@ use std::fmt; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::FilterNode; +use super::stream::StreamPlanRef; use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::PlanBase; @@ -39,6 +40,7 @@ impl StreamFilter { &logical, dist, input.append_only(), + input.emit_on_window_close(), input.watermark_columns().clone(), ); StreamFilter { base, logical } diff --git a/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs index 99a7e331417ab..1dff6e56097ad 100644 --- a/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_global_simple_agg.rs @@ -49,7 +49,8 @@ impl StreamGlobalSimpleAgg { let watermark_columns = FixedBitSet::with_capacity(logical.output_len()); // Simple agg executor might change the append-only behavior of the stream. - let base = PlanBase::new_stream_with_logical(&logical, dist, false, watermark_columns); + let base = + PlanBase::new_stream_with_logical(&logical, dist, false, false, watermark_columns); StreamGlobalSimpleAgg { base, logical, diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index 464a5d3281557..9bc5d34f657d2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -68,6 +68,7 @@ impl StreamGroupTopN { input.functional_dependency().clone(), input.distribution().clone(), false, + false, // TODO(rc): group top-n EOWC support? watermark_columns, ); StreamGroupTopN { diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs index 6a3277c8e132d..f2f85ea06f788 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs @@ -65,7 +65,13 @@ impl StreamHashAgg { } // Hash agg executor might change the append-only behavior of the stream. - let base = PlanBase::new_stream_with_logical(&logical, dist, false, watermark_columns); + let base = PlanBase::new_stream_with_logical( + &logical, + dist, + false, + false, // TODO(rc): support generating EOWC hash agg plan + watermark_columns, + ); StreamHashAgg { base, logical, diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs index 9dca1da8d7bf4..25f3878ce8e64 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -189,8 +189,13 @@ impl StreamHashJoin { }; // TODO: derive from input - let base = - PlanBase::new_stream_with_logical(&logical, dist, append_only, watermark_columns); + let base = PlanBase::new_stream_with_logical( + &logical, + dist, + append_only, + false, // TODO(rc): derive EOWC property from input + watermark_columns, + ); Self { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index bbece57b6c1e9..bf72f949175d1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -20,6 +20,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::HopWindowNode; +use super::stream::StreamPlanRef; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -61,7 +62,8 @@ impl StreamHopWindow { let base = PlanBase::new_stream_with_logical( &logical, dist, - logical.input.append_only(), + input.append_only(), + input.emit_on_window_close(), watermark_columns, ); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs index cdfb0dc1faa73..faf39747c4295 100644 --- a/src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_local_simple_agg.rs @@ -21,6 +21,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::generic::{self, PlanAggCall}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::ExprRewriter; +use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::property::RequiredDist; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -54,6 +55,7 @@ impl StreamLocalSimpleAgg { &logical, input_dist.clone(), input.append_only(), + input.emit_on_window_close(), watermark_columns, ); StreamLocalSimpleAgg { base, logical } diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs index 113c825c7fbf1..15489aba14f85 100644 --- a/src/frontend/src/optimizer/plan_node/stream_now.rs +++ b/src/frontend/src/optimizer/plan_node/stream_now.rs @@ -51,6 +51,7 @@ impl StreamNow { FunctionalDependencySet::default(), Distribution::Single, false, + false, // TODO(rc): derive EOWC property from input watermark_columns, ); Self { base } diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index c6e5fb45a4ea8..52eee91d6d7cf 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -20,6 +20,7 @@ use risingwave_common::catalog::FieldDisplay; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::ProjectNode; +use super::stream::StreamPlanRef; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{try_derive_watermark, Expr, ExprImpl, ExprRewriter}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -78,6 +79,7 @@ impl StreamProject { &logical, distribution, input.append_only(), + input.emit_on_window_close(), watermark_columns, ); StreamProject { diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs index 8c239032d3ad6..0e5c19937e082 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs @@ -19,6 +19,7 @@ use itertools::Itertools; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::ProjectSetNode; +use super::stream::StreamPlanRef; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{try_derive_watermark, ExprRewriter}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -53,6 +54,7 @@ impl StreamProjectSet { &logical, distribution, input.append_only(), + input.emit_on_window_close(), watermark_columns, ); StreamProjectSet { base, logical } diff --git a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs index c703428a94d66..9f7d5cf400152 100644 --- a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs +++ b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs @@ -44,6 +44,7 @@ impl StreamRowIdGen { input.functional_dependency().clone(), distribution, input.append_only(), + input.emit_on_window_close(), input.watermark_columns().clone(), ); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index e89bfc6010a66..7220ec7d9fa72 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -18,6 +18,7 @@ use itertools::Itertools; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode, PbStreamNode}; +use super::stream::StreamPlanRef; use super::{ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode}; use crate::optimizer::property::Distribution; @@ -43,8 +44,9 @@ impl StreamShare { pk_indices, logical.functional_dependency().clone(), dist, - logical.input().append_only(), - logical.input().watermark_columns().clone(), + input.append_only(), + input.emit_on_window_close(), + input.watermark_columns().clone(), ); StreamShare { base, logical } } diff --git a/src/frontend/src/optimizer/plan_node/stream_sort.rs b/src/frontend/src/optimizer/plan_node/stream_sort.rs index 151b6727471cb..b3e761d71cf9f 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sort.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sort.rs @@ -57,6 +57,7 @@ impl StreamSort { fd_set, dist, true, + true, watermark_columns, ); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index cdbfe8ffbe1cc..19768d83c21a9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -46,6 +46,7 @@ impl StreamSource { &logical, Distribution::SomeShard, logical.catalog.as_ref().map_or(true, |s| s.append_only), + false, watermark_columns, ); Self { base, logical } diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 710096c958bdf..163673906584b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -72,6 +72,7 @@ impl StreamTableScan { logical.functional_dependency().clone(), distribution, logical.table_desc().append_only, + false, logical.watermark_columns(), ); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index 361b0025f628b..ddef0437809c6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -67,7 +67,13 @@ impl StreamTemporalJoin { .rewrite_bitset(logical.left.watermark_columns()), ); - let base = PlanBase::new_stream_with_logical(&logical, dist, true, watermark_columns); + let base = PlanBase::new_stream_with_logical( + &logical, + dist, + true, + false, // TODO(rc): derive EOWC property from input + watermark_columns, + ); Self { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index 3778344a32c5e..2d420b46ee10a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -40,7 +40,8 @@ impl StreamTopN { }; let watermark_columns = FixedBitSet::with_capacity(input.schema().len()); - let base = PlanBase::new_stream_with_logical(&logical, dist, false, watermark_columns); + let base = + PlanBase::new_stream_with_logical(&logical, dist, false, false, watermark_columns); StreamTopN { base, logical } } diff --git a/src/frontend/src/optimizer/plan_node/stream_union.rs b/src/frontend/src/optimizer/plan_node/stream_union.rs index e103dc5988bab..2be4190e5ae82 100644 --- a/src/frontend/src/optimizer/plan_node/stream_union.rs +++ b/src/frontend/src/optimizer/plan_node/stream_union.rs @@ -60,6 +60,7 @@ impl StreamUnion { logical.functional_dependency().clone(), dist, logical.inputs().iter().all(|x| x.append_only()), + logical.inputs().iter().all(|x| x.emit_on_window_close()), watermark_columns, ); StreamUnion { base, logical } diff --git a/src/frontend/src/optimizer/plan_node/stream_values.rs b/src/frontend/src/optimizer/plan_node/stream_values.rs index e6587fac61a56..9b6ceebf3c278 100644 --- a/src/frontend/src/optimizer/plan_node/stream_values.rs +++ b/src/frontend/src/optimizer/plan_node/stream_values.rs @@ -46,6 +46,7 @@ impl StreamValues { logical.functional_dependency().clone(), Distribution::Single, false, + false, watermark_columns, ); Self { base, logical } diff --git a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs index 73cce1d689aa3..64105f680ecbc 100644 --- a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs @@ -50,6 +50,7 @@ impl StreamWatermarkFilter { input.functional_dependency().clone(), input.distribution().clone(), input.append_only(), + false, // TODO(rc): decide EOWC property watermark_columns, ); Self::with_base(base, input, watermark_descs)