diff --git a/e2e_test/streaming/watermark.slt b/e2e_test/streaming/watermark.slt index 66d3a360fcb70..f145a3a46e282 100644 --- a/e2e_test/streaming/watermark.slt +++ b/e2e_test/streaming/watermark.slt @@ -9,7 +9,10 @@ create table t ( ) append only; statement ok -create materialized view mv as select * from t emit on window close; +create materialized view mv1 as select * from t emit on window close; + +statement ok +create materialized view mv2 as select t.ts, unnest(Array[1,2,3]) from t emit on window close; statement ok insert into t values ('2023-05-06 16:51:00', 1), ('2023-05-06 16:51:00', 2), ('2023-05-06 16:51:00', 3); @@ -22,14 +25,31 @@ sleep 5s skipif in-memory query TI -select * from mv; +select * from mv1; +---- +2023-05-06 16:51:00 1 +2023-05-06 16:51:00 2 +2023-05-06 16:51:00 3 + +skipif in-memory +query TI +select * from mv2; ---- 2023-05-06 16:51:00 1 2023-05-06 16:51:00 2 2023-05-06 16:51:00 3 +2023-05-06 16:51:00 1 +2023-05-06 16:51:00 2 +2023-05-06 16:51:00 3 +2023-05-06 16:51:00 1 +2023-05-06 16:51:00 2 +2023-05-06 16:51:00 3 + +statement ok +drop materialized view mv1; statement ok -drop materialized view mv; +drop materialized view mv2; statement ok drop table t; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 7b81d917bca16..33fe96a71c803 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -540,6 +540,12 @@ message ExpandNode { message ProjectSetNode { repeated expr.ProjectSetSelectItem select_list = 1; + // this two field is expressing a list of usize pair, which means when project receives a + // watermark with `watermark_input_cols[i]` column index, it should derive a new watermark + // with `watermark_output_cols[i]`th expression + repeated uint32 watermark_input_cols = 2; + repeated uint32 watermark_expr_indices = 3; + repeated uint32 nondecreasing_exprs = 4; } // Sorts inputs and outputs ordered data based on watermark. diff --git a/src/frontend/planner_test/tests/testdata/input/watermark.yaml b/src/frontend/planner_test/tests/testdata/input/watermark.yaml index c56045f8ed44c..1dfe8155b7817 100644 --- a/src/frontend/planner_test/tests/testdata/input/watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/input/watermark.yaml @@ -96,3 +96,9 @@ select window_start from hop(t, ts, interval '1' minute, interval '3' minute); expected_outputs: - stream_plan +- name: unnest + sql: | + create table t (ts timestamp with time zone, v1 int, watermark for ts as ts - INTERVAL '1' SECOND) append only; + explain create materialized view mv as select t.ts, unnest(Array[1,2,3]) from t emit on window close; + expected_outputs: + - explain_output diff --git a/src/frontend/planner_test/tests/testdata/output/watermark.yaml b/src/frontend/planner_test/tests/testdata/output/watermark.yaml index a6f997735a00f..d1916a33192c6 100644 --- a/src/frontend/planner_test/tests/testdata/output/watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/watermark.yaml @@ -210,3 +210,12 @@ └─StreamHopWindow { time_col: t.ts, slide: 00:01:00, size: 00:03:00, output: [window_start, t._row_id], output_watermarks: [window_start] } └─StreamFilter { predicate: IsNotNull(t.ts) } └─StreamTableScan { table: t, columns: [t.ts, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- name: unnest + sql: | + create table t (ts timestamp with time zone, v1 int, watermark for ts as ts - INTERVAL '1' SECOND) append only; + explain create materialized view mv as select t.ts, unnest(Array[1,2,3]) from t emit on window close; + explain_output: | + StreamMaterialize { columns: [projected_row_id(hidden), ts, unnest, t._row_id(hidden)], stream_key: [t._row_id, projected_row_id], pk_columns: [t._row_id, projected_row_id], pk_conflict: NoCheck, watermark_columns: [ts] } + └─StreamEowcSort { sort_column: t.ts } + └─StreamProjectSet { select_list: [$0, Unnest(ARRAY[1, 2, 3]:List(Int32)), $1] } + └─StreamTableScan { table: t, columns: [ts, _row_id] } diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 9600fb0cf611c..e9de735718a07 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -711,14 +711,8 @@ pub fn to_stream_prost_body( table: Some(me.table.to_internal_table_prost()), }) } - Node::ProjectSet(me) => { - let me = &me.core; - let select_list = me - .select_list - .iter() - .map(ExprImpl::to_project_set_select_item_proto) - .collect(); - PbNodeBody::ProjectSet(ProjectSetNode { select_list }) + Node::ProjectSet(_) => { + unreachable!() } Node::Project(me) => PbNodeBody::Project(ProjectNode { select_list: me.core.exprs.iter().map(|x| x.to_expr_proto()).collect(), diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs index 986eb97826bf7..619fec1f80d15 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs @@ -28,6 +28,12 @@ use crate::utils::ColIndexMappingRewriteExt; pub struct StreamProjectSet { pub base: PlanBase, logical: generic::ProjectSet, + /// All the watermark derivations, (input_column_idx, expr_idx). And the + /// derivation expression is the project_set's expression itself. + watermark_derivations: Vec<(usize, usize)>, + /// Nondecreasing expression indices. `ProjectSet` can produce watermarks for these + /// expressions. + nondecreasing_exprs: Vec, } impl StreamProjectSet { @@ -37,15 +43,26 @@ impl StreamProjectSet { .i2o_col_mapping() .rewrite_provided_distribution(input.distribution()); + let mut watermark_derivations = vec![]; + let mut nondecreasing_exprs = vec![]; let mut watermark_columns = FixedBitSet::with_capacity(logical.output_len()); for (expr_idx, expr) in logical.select_list.iter().enumerate() { - if let WatermarkDerivation::Watermark(input_idx) = try_derive_watermark(expr) { - if input.watermark_columns().contains(input_idx) { - // The first column of ProjectSet is `projected_row_id`. + match try_derive_watermark(expr) { + WatermarkDerivation::Watermark(input_idx) => { + if input.watermark_columns().contains(input_idx) { + watermark_derivations.push((input_idx, expr_idx)); + watermark_columns.insert(expr_idx + 1); + } + } + WatermarkDerivation::Nondecreasing => { + nondecreasing_exprs.push(expr_idx); watermark_columns.insert(expr_idx + 1); } + WatermarkDerivation::Constant => { + // XXX(rc): we can produce one watermark on each recovery for this case. + } + WatermarkDerivation::None => {} } - // XXX(rc): do we need to handle `WatermarkDerivation::Nondecreasing` here? } // ProjectSet executor won't change the append-only behavior of the stream, so it depends on @@ -57,7 +74,12 @@ impl StreamProjectSet { input.emit_on_window_close(), watermark_columns, ); - StreamProjectSet { base, logical } + StreamProjectSet { + base, + logical, + watermark_derivations, + nondecreasing_exprs, + } } } impl_distill_by_unit!(StreamProjectSet, logical, "StreamProjectSet"); @@ -77,6 +99,11 @@ impl PlanTreeNodeUnary for StreamProjectSet { impl StreamNode for StreamProjectSet { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { + let (watermark_input_cols, watermark_expr_indices) = self + .watermark_derivations + .iter() + .map(|(i, o)| (*i as u32, *o as u32)) + .unzip(); PbNodeBody::ProjectSet(ProjectSetNode { select_list: self .logical @@ -84,6 +111,9 @@ impl StreamNode for StreamProjectSet { .iter() .map(|select_item| select_item.to_project_set_select_item_proto()) .collect_vec(), + watermark_input_cols, + watermark_expr_indices, + nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(), }) } } diff --git a/src/stream/src/executor/project_set.rs b/src/stream/src/executor/project_set.rs index df144fb29e57d..f1962d456b2e1 100644 --- a/src/stream/src/executor/project_set.rs +++ b/src/stream/src/executor/project_set.rs @@ -17,23 +17,56 @@ use std::fmt::{Debug, Formatter}; use either::Either; use futures::StreamExt; use futures_async_stream::try_stream; -use risingwave_common::array::Op; +use multimap::MultiMap; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::bail; use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::types::{DataType, DatumRef}; +use risingwave_common::row::{Row, RowExt}; +use risingwave_common::types::{DataType, Datum, DatumRef, ToOwnedDatum}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::table_function::ProjectSetSelectItem; use super::error::StreamExecutorError; -use super::{BoxedExecutor, Executor, ExecutorInfo, Message, PkIndices, PkIndicesRef}; +use super::{ + ActorContextRef, BoxedExecutor, Executor, ExecutorInfo, Message, PkIndices, PkIndicesRef, + StreamExecutorResult, Watermark, +}; use crate::common::StreamChunkBuilder; +const PROJ_ROW_ID_OFFSET: usize = 1; + +/// `ProjectSetExecutor` projects data with the `expr`. The `expr` takes a chunk of data, +/// and returns a new data chunk. And then, `ProjectSetExecutor` will insert, delete +/// or update element into next operator according to the result of the expression. +pub struct ProjectSetExecutor { + input: BoxedExecutor, + inner: Inner, +} + +struct Inner { + info: ExecutorInfo, + ctx: ActorContextRef, + /// Expressions of the current project_section. + select_list: Vec, + chunk_size: usize, + /// All the watermark derivations, (input_column_index, expr_idx). And the + /// derivation expression is the project_set's expression itself. + watermark_derivations: MultiMap, + /// Indices of nondecreasing expressions in the expression list. + nondecreasing_expr_indices: Vec, +} + impl ProjectSetExecutor { + #[allow(clippy::too_many_arguments)] pub fn new( + ctx: ActorContextRef, input: Box, pk_indices: PkIndices, select_list: Vec, executor_id: u64, chunk_size: usize, + watermark_derivations: MultiMap, + nondecreasing_expr_indices: Vec, ) -> Self { let mut fields = vec![Field::with_name(DataType::Int64, "projected_row_id")]; fields.extend( @@ -47,57 +80,50 @@ impl ProjectSetExecutor { pk_indices, identity: format!("ProjectSet {:X}", executor_id), }; - Self { - input, + + let inner = Inner { info, + ctx, select_list, chunk_size, - } - } -} + watermark_derivations, + nondecreasing_expr_indices, + }; -/// `ProjectSetExecutor` projects data with the `expr`. The `expr` takes a chunk of data, -/// and returns a new data chunk. And then, `ProjectSetExecutor` will insert, delete -/// or update element into next operator according to the result of the expression. -pub struct ProjectSetExecutor { - input: BoxedExecutor, - info: ExecutorInfo, - /// Expressions of the current project_section. - select_list: Vec, - chunk_size: usize, + Self { input, inner } + } } impl Debug for ProjectSetExecutor { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("ProjectSetExecutor") - .field("exprs", &self.select_list) + .field("exprs", &self.inner.select_list) .finish() } } impl Executor for ProjectSetExecutor { fn execute(self: Box) -> super::BoxedMessageStream { - self.execute_inner().boxed() + self.inner.execute(self.input).boxed() } fn schema(&self) -> &Schema { - &self.info.schema + &self.inner.info.schema } fn pk_indices(&self) -> PkIndicesRef<'_> { - &self.info.pk_indices + &self.inner.info.pk_indices } fn identity(&self) -> &str { - &self.info.identity + &self.inner.info.identity } } -impl ProjectSetExecutor { +impl Inner { #[try_stream(ok = Message, error = StreamExecutorError)] - async fn execute_inner(self) { + async fn execute(self, input: BoxedExecutor) { assert!(!self.select_list.is_empty()); - // First column will be `projected_row_id`, which represents the index in the // output table let data_types: Vec<_> = std::iter::once(DataType::Int64) @@ -107,13 +133,32 @@ impl ProjectSetExecutor { let mut row = vec![DatumRef::None; data_types.len()]; let mut builder = StreamChunkBuilder::new(self.chunk_size, data_types); + let mut last_nondec_expr_values = vec![None; self.nondecreasing_expr_indices.len()]; #[for_await] - for msg in self.input.execute() { + for msg in input.execute() { match msg? { - Message::Watermark(_) => { - // TODO: https://github.com/risingwavelabs/risingwave/issues/6042 + Message::Watermark(watermark) => { + let watermarks = self.handle_watermark(watermark).await?; + for watermark in watermarks { + yield Message::Watermark(watermark) + } + } + m @ Message::Barrier(_) => { + for (&expr_idx, value) in self + .nondecreasing_expr_indices + .iter() + .zip_eq_fast(&mut last_nondec_expr_values) + { + if let Some(value) = std::mem::take(value) { + yield Message::Watermark(Watermark::new( + expr_idx + PROJ_ROW_ID_OFFSET, + self.select_list[expr_idx].return_type(), + value, + )) + } + } + yield m } - m @ Message::Barrier(_) => yield m, Message::Chunk(chunk) => { let mut results = Vec::with_capacity(self.select_list.len()); for select_item in &self.select_list { @@ -157,6 +202,10 @@ impl ProjectSetExecutor { break; } if let Some(chunk) = builder.append_row(op, &*row) { + self.update_last_nondec_expr_values( + &mut last_nondec_expr_values, + &chunk, + ); yield Message::Chunk(chunk); } // move to the next row @@ -168,10 +217,71 @@ impl ProjectSetExecutor { } } if let Some(chunk) = builder.take() { + self.update_last_nondec_expr_values(&mut last_nondec_expr_values, &chunk); yield Message::Chunk(chunk); } } } } } + + fn update_last_nondec_expr_values( + &self, + last_nondec_expr_values: &mut [Datum], + chunk: &StreamChunk, + ) { + if !self.nondecreasing_expr_indices.is_empty() { + if let Some((_, first_visible_row)) = chunk.rows().next() { + // it's ok to use the first row here, just one chunk delay + first_visible_row + .project(&self.nondecreasing_expr_indices) + .iter() + .enumerate() + .for_each(|(idx, value)| { + last_nondec_expr_values[idx] = Some( + value + .to_owned_datum() + .expect("non-decreasing expression should never be NULL"), + ); + }); + } + } + } + + async fn handle_watermark(&self, watermark: Watermark) -> StreamExecutorResult> { + let expr_indices = match self.watermark_derivations.get_vec(&watermark.col_idx) { + Some(v) => v, + None => return Ok(vec![]), + }; + let mut ret = vec![]; + for expr_idx in expr_indices { + let expr_idx = *expr_idx; + let derived_watermark = match &self.select_list[expr_idx] { + ProjectSetSelectItem::Expr(expr) => { + watermark + .clone() + .transform_with_expr(expr, expr_idx + PROJ_ROW_ID_OFFSET, |err| { + self.ctx.on_compute_error( + err, + &(self.info.identity.to_string() + "(when computing watermark)"), + ) + }) + .await + } + ProjectSetSelectItem::TableFunction(_) => { + bail!("Watermark should not be produced by a table function"); + } + }; + + if let Some(derived_watermark) = derived_watermark { + ret.push(derived_watermark); + } else { + warn!( + "{} derive a NULL watermark with the expression {}!", + self.info.identity, expr_idx + ); + } + } + Ok(ret) + } } diff --git a/src/stream/src/from_proto/project_set.rs b/src/stream/src/from_proto/project_set.rs index 4a07a3f8bed27..57c422169e54f 100644 --- a/src/stream/src/from_proto/project_set.rs +++ b/src/stream/src/from_proto/project_set.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use multimap::MultiMap; +use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::table_function::ProjectSetSelectItem; use risingwave_pb::stream_plan::ProjectSetNode; @@ -38,13 +40,32 @@ impl ExecutorBuilder for ProjectSetExecutorBuilder { ProjectSetSelectItem::from_prost(proto, params.env.config().developer.chunk_size) }) .try_collect()?; + let watermark_derivations = MultiMap::from_iter( + node.get_watermark_input_cols() + .iter() + .map(|idx| *idx as usize) + .zip_eq_fast( + node.get_watermark_expr_indices() + .iter() + .map(|idx| *idx as usize), + ), + ); + let nondecreasing_expr_indices = node + .get_nondecreasing_exprs() + .iter() + .map(|idx| *idx as usize) + .collect(); + let chunk_size = params.env.config().developer.chunk_size; Ok(ProjectSetExecutor::new( + params.actor_context, input, params.pk_indices, select_list, params.executor_id, chunk_size, + watermark_derivations, + nondecreasing_expr_indices, ) .boxed()) } diff --git a/src/stream/tests/integration_tests/project_set.rs b/src/stream/tests/integration_tests/project_set.rs index c98f7d65e4c32..bf1354c25b83b 100644 --- a/src/stream/tests/integration_tests/project_set.rs +++ b/src/stream/tests/integration_tests/project_set.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use multimap::MultiMap; use risingwave_expr::table_function::repeat; use risingwave_stream::executor::ProjectSetExecutor; @@ -29,15 +30,24 @@ fn create_executor() -> (MessageSender, BoxedMessageStream) { let (tx, source) = MockSource::channel(schema, PkIndices::new()); let test_expr = build_from_pretty("(add:int8 $0:int8 $1:int8)"); + let test_expr_watermark = build_from_pretty("(add:int8 $0:int8 1:int8)"); let tf1 = repeat(build_from_pretty("1:int4"), 1); let tf2 = repeat(build_from_pretty("2:int4"), 2); let project_set = Box::new(ProjectSetExecutor::new( + ActorContext::create(123), Box::new(source), vec![], - vec![test_expr.into(), tf1.into(), tf2.into()], + vec![ + test_expr.into(), + test_expr_watermark.into(), + tf1.into(), + tf2.into(), + ], 1, CHUNK_SIZE, + MultiMap::from_iter(std::iter::once((0, 1))), + vec![], )); (tx, project_set.execute()) } @@ -52,6 +62,7 @@ async fn test_project_set() { + 2 5 + 3 6", )); + tx.push_int64_watermark(0, 3); tx.push_chunk(StreamChunk::from_pretty( " I I + 7 8 @@ -62,21 +73,24 @@ async fn test_project_set() { &mut project_set, expect_test::expect![[r#" - !chunk |- - +---+---+---+---+---+ - | + | 0 | 5 | 1 | 2 | - | + | 1 | 5 | | 2 | - | + | 0 | 7 | 1 | 2 | - | + | 1 | 7 | | 2 | - | + | 0 | 9 | 1 | 2 | - | + | 1 | 9 | | 2 | - +---+---+---+---+---+ + +---+---+---+---+---+---+ + | + | 0 | 5 | 2 | 1 | 2 | + | + | 1 | 5 | 2 | | 2 | + | + | 0 | 7 | 3 | 1 | 2 | + | + | 1 | 7 | 3 | | 2 | + | + | 0 | 9 | 4 | 1 | 2 | + | + | 1 | 9 | 4 | | 2 | + +---+---+---+---+---+---+ + - !watermark + col_idx: 2 + val: '4' - !chunk |- - +---+---+----+---+---+ - | + | 0 | 15 | 1 | 2 | - | + | 1 | 15 | | 2 | - | - | 0 | 9 | 1 | 2 | - | - | 1 | 9 | | 2 | - +---+---+----+---+---+ + +---+---+----+---+---+---+ + | + | 0 | 15 | 8 | 1 | 2 | + | + | 1 | 15 | 8 | | 2 | + | - | 0 | 9 | 4 | 1 | 2 | + | - | 1 | 9 | 4 | | 2 | + +---+---+----+---+---+---+ "#]], SnapshotOptions::default(), );