diff --git a/e2e_test/streaming/temporal_join/temporal_join_watermark.slt b/e2e_test/streaming/temporal_join/temporal_join_watermark.slt new file mode 100644 index 000000000000..d1557ab2fa75 --- /dev/null +++ b/e2e_test/streaming/temporal_join/temporal_join_watermark.slt @@ -0,0 +1,71 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int, v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '10' SECOND) APPEND ONLY; + +# FIXME. If we don't insert at first, it would cause a panic when create eowc_mv. +statement ok +insert into stream values (1, 1, 1, '2023-09-14 06:00:00'); + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create materialized view temporal_join_mv as select id1, a1, id2, v1 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2; + +statement ok +create materialized view eowc_mv as select window_start, count(id1) from tumble(temporal_join_mv, v1, interval '5 s') group by window_start emit on window close; + +query IIII rowsort +select * from temporal_join_mv; +---- +1 1 NULL 2023-09-14 06:00:00+00:00 + +query IIII rowsort +select * from eowc_mv; +---- + +statement ok +insert into stream values (2, 2, 2, '2023-09-14 06:00:25'); + +sleep 5s + +query IIII rowsort +select * from temporal_join_mv; +---- +1 1 NULL 2023-09-14 06:00:00+00:00 +2 2 NULL 2023-09-14 06:00:25+00:00 + + +query IIII rowsort +select * from eowc_mv; +---- +2023-09-14 06:00:00+00:00 1 + +statement ok +insert into stream values (3, 3, 3, '2023-09-14 06:00:45'); + +sleep 5s + +query IIII rowsort +select * from temporal_join_mv; +---- +1 1 NULL 2023-09-14 06:00:00+00:00 +2 2 NULL 2023-09-14 06:00:25+00:00 +3 3 NULL 2023-09-14 06:00:45+00:00 + +query IIII rowsort +select * from eowc_mv; +---- +2023-09-14 06:00:00+00:00 1 +2023-09-14 06:00:25+00:00 1 + +statement ok +drop table stream cascade; + +statement ok +drop table version cascade; + + + diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 1372ad808eab..462f3f2fe1af 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -44,7 +44,9 @@ use crate::cache::{cache_may_stale, new_with_hasher_in, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::common::JoinStreamChunkBuilder; use crate::executor::monitor::StreamingMetrics; -use crate::executor::{ActorContextRef, BoxedExecutor, JoinType, JoinTypePrimitive, PkIndices}; +use crate::executor::{ + ActorContextRef, BoxedExecutor, JoinType, JoinTypePrimitive, PkIndices, Watermark, +}; use crate::task::AtomicU64Ref; pub struct TemporalJoinExecutor { @@ -235,15 +237,16 @@ impl TemporalSide { enum InternalMessage { Chunk(StreamChunk), Barrier(Vec, Barrier), + WaterMark(Watermark), } #[try_stream(ok = StreamChunk, error = StreamExecutorError)] -pub async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) { +async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) { #[for_await] for item in stream { match item? { Message::Watermark(_) => { - // TODO: https://github.com/risingwavelabs/risingwave/issues/6042 + // ignore } Message::Chunk(c) => yield c, Message::Barrier(b) if b.epoch != expected_barrier.epoch => { @@ -254,6 +257,23 @@ pub async fn chunks_until_barrier(stream: impl MessageStream, expected_barrier: } } +#[try_stream(ok = InternalMessage, error = StreamExecutorError)] +async fn internal_messages_until_barrier(stream: impl MessageStream, expected_barrier: Barrier) { + #[for_await] + for item in stream { + match item? { + Message::Watermark(w) => { + yield InternalMessage::WaterMark(w); + } + Message::Chunk(c) => yield InternalMessage::Chunk(c), + Message::Barrier(b) if b.epoch != expected_barrier.epoch => { + return Err(StreamExecutorError::align_barrier(expected_barrier, b)); + } + Message::Barrier(_) => return Ok(()), + } + } +} + // Align the left and right inputs according to their barriers, // such that in the produced stream, an aligned interval starts with // any number of `InternalMessage::Chunk(left_chunk)` and followed by @@ -285,18 +305,20 @@ async fn align_input(left: Box, right: Box) { } Some(Either::Right(Ok(Message::Barrier(b)))) => { #[for_await] - for chunk in chunks_until_barrier(left.by_ref(), b.clone()) { - yield InternalMessage::Chunk(chunk?); + for internal_message in + internal_messages_until_barrier(left.by_ref(), b.clone()) + { + yield internal_message?; } yield InternalMessage::Barrier(right_chunks, b); break 'inner; } Some(Either::Left(Err(e)) | Either::Right(Err(e))) => return Err(e), - Some( - Either::Left(Ok(Message::Watermark(_))) - | Either::Right(Ok(Message::Watermark(_))), - ) => { - // TODO: https://github.com/risingwavelabs/risingwave/issues/6042 + Some(Either::Left(Ok(Message::Watermark(w)))) => { + yield InternalMessage::WaterMark(w); + } + Some(Either::Right(Ok(Message::Watermark(_)))) => { + // ignore right side watermark } None => return Ok(()), } @@ -381,6 +403,8 @@ impl TemporalJoinExecutor self.right.schema().len(), ); + let left_to_output: HashMap = HashMap::from_iter(left_map.iter().cloned()); + let right_stream_key_indices = self.right.pk_indices().to_vec(); let null_matched = K::Bitmap::from_bool_vec(self.null_safe); @@ -398,6 +422,10 @@ impl TemporalJoinExecutor .with_label_values(&[&table_id_str, &actor_id_str]) .set(self.right_table.cache.len() as i64); match msg? { + InternalMessage::WaterMark(watermark) => { + let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap(); + yield Message::Watermark(watermark.with_idx(output_watermark_col_idx)); + } InternalMessage::Chunk(chunk) => { // Compact chunk, otherwise the following keys and chunk rows might fail to zip. let chunk = chunk.compact();