From 08400dc7a569e677859e636fe46d8a2758eaa16f Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 1 Nov 2024 14:44:41 +0800 Subject: [PATCH] fix(watermark): stop generating watermark messages when stream is paused (#19199) Signed-off-by: Richard Chien --- src/stream/src/executor/project.rs | 51 ++++++++++++++----- src/stream/src/executor/project_set.rs | 49 +++++++++++++----- src/stream/src/executor/watermark_filter.rs | 21 ++++++-- .../tests/integration_tests/project_set.rs | 3 ++ 4 files changed, 93 insertions(+), 31 deletions(-) diff --git a/src/stream/src/executor/project.rs b/src/stream/src/executor/project.rs index 6ea579afea524..bd241c909245d 100644 --- a/src/stream/src/executor/project.rs +++ b/src/stream/src/executor/project.rs @@ -40,6 +40,8 @@ struct Inner { nondecreasing_expr_indices: Vec, /// Last seen values of nondecreasing expressions, buffered to periodically produce watermarks. last_nondec_expr_values: Vec>, + /// Whether the stream is paused. + is_paused: bool, /// the selectivity threshold which should be in `[0,1]`. for the chunk with selectivity less /// than the threshold, the Project executor will construct a new chunk before expr evaluation, @@ -70,6 +72,7 @@ impl ProjectExecutor { watermark_derivations, nondecreasing_expr_indices, last_nondec_expr_values: vec![None; n_nondecreasing_exprs], + is_paused: false, materialize_selectivity_threshold, noop_update_hint, }, @@ -143,8 +146,13 @@ impl Inner { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute(mut self, input: Executor) { + let mut input = input.execute(); + let first_barrier = expect_first_barrier(&mut input).await?; + self.is_paused = first_barrier.is_pause_on_startup(); + yield Message::Barrier(first_barrier); + #[for_await] - for msg in input.execute() { + for msg in input { let msg = msg?; match msg { Message::Watermark(w) => { @@ -174,21 +182,36 @@ impl Inner { } None => continue, }, - barrier @ Message::Barrier(_) => { - for (&expr_idx, value) in self - .nondecreasing_expr_indices - .iter() - .zip_eq_fast(&mut self.last_nondec_expr_values) - { - if let Some(value) = std::mem::take(value) { - yield Message::Watermark(Watermark::new( - expr_idx, - self.exprs[expr_idx].return_type(), - value, - )) + Message::Barrier(barrier) => { + if !self.is_paused { + for (&expr_idx, value) in self + .nondecreasing_expr_indices + .iter() + .zip_eq_fast(&mut self.last_nondec_expr_values) + { + if let Some(value) = std::mem::take(value) { + yield Message::Watermark(Watermark::new( + expr_idx, + self.exprs[expr_idx].return_type(), + value, + )) + } + } + } + + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => { + self.is_paused = true; + } + Mutation::Resume => { + self.is_paused = false; + } + _ => (), } } - yield barrier; + + yield Message::Barrier(barrier); } } } diff --git a/src/stream/src/executor/project_set.rs b/src/stream/src/executor/project_set.rs index 8f5c0e533bfbd..dff51a39255cf 100644 --- a/src/stream/src/executor/project_set.rs +++ b/src/stream/src/executor/project_set.rs @@ -93,6 +93,11 @@ impl Execute for ProjectSetExecutor { impl Inner { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute(self, input: Executor) { + let mut input = input.execute(); + let first_barrier = expect_first_barrier(&mut input).await?; + let mut is_paused = first_barrier.is_pause_on_startup(); + yield Message::Barrier(first_barrier); + assert!(!self.select_list.is_empty()); // First column will be `projected_row_id`, which represents the index in the // output table @@ -104,8 +109,9 @@ impl Inner { let mut builder = StreamChunkBuilder::new(self.chunk_size, data_types); let mut last_nondec_expr_values = vec![None; self.nondecreasing_expr_indices.len()]; + #[for_await] - for msg in input.execute() { + for msg in input { match msg? { Message::Watermark(watermark) => { let watermarks = self.handle_watermark(watermark).await?; @@ -113,21 +119,36 @@ impl Inner { yield Message::Watermark(watermark) } } - m @ Message::Barrier(_) => { - for (&expr_idx, value) in self - .nondecreasing_expr_indices - .iter() - .zip_eq_fast(&mut last_nondec_expr_values) - { - if let Some(value) = std::mem::take(value) { - yield Message::Watermark(Watermark::new( - expr_idx + PROJ_ROW_ID_OFFSET, - self.select_list[expr_idx].return_type(), - value, - )) + Message::Barrier(barrier) => { + if !is_paused { + for (&expr_idx, value) in self + .nondecreasing_expr_indices + .iter() + .zip_eq_fast(&mut last_nondec_expr_values) + { + if let Some(value) = std::mem::take(value) { + yield Message::Watermark(Watermark::new( + expr_idx + PROJ_ROW_ID_OFFSET, + self.select_list[expr_idx].return_type(), + value, + )) + } } } - yield m + + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => { + is_paused = true; + } + Mutation::Resume => { + is_paused = false; + } + _ => (), + } + } + + yield Message::Barrier(barrier); } Message::Chunk(chunk) => { let mut results = Vec::with_capacity(self.select_list.len()); diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 01497c37fdab5..47a5448435b84 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -100,6 +100,7 @@ impl WatermarkFilterExecutor { let first_barrier = expect_first_barrier(&mut input).await?; let prev_epoch = first_barrier.epoch.prev; table.init_epoch(first_barrier.epoch); + let mut is_paused = first_barrier.is_pause_on_startup(); // The first barrier message should be propagated. yield Message::Barrier(first_barrier); @@ -113,7 +114,9 @@ impl WatermarkFilterExecutor { let mut last_checkpoint_watermark = None; - if let Some(watermark) = current_watermark.clone() { + if let Some(watermark) = current_watermark.clone() + && !is_paused + { yield Message::Watermark(Watermark::new( event_time_col_idx, watermark_type.clone(), @@ -240,8 +243,20 @@ impl WatermarkFilterExecutor { } } } - table.commit(barrier.epoch).await?; + + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => { + is_paused = true; + } + Mutation::Resume => { + is_paused = false; + } + _ => (), + } + } + yield Message::Barrier(barrier); if need_update_global_max_watermark { @@ -253,7 +268,7 @@ impl WatermarkFilterExecutor { .await?; } - if is_checkpoint { + if is_checkpoint && !is_paused { if idle_input { barrier_num_during_idle += 1; diff --git a/src/stream/tests/integration_tests/project_set.rs b/src/stream/tests/integration_tests/project_set.rs index 543f710b61b75..ce0bef7a832de 100644 --- a/src/stream/tests/integration_tests/project_set.rs +++ b/src/stream/tests/integration_tests/project_set.rs @@ -13,6 +13,7 @@ // limitations under the License. use multimap::MultiMap; +use risingwave_common::util::epoch::test_epoch; use risingwave_expr::table_function::repeat; use risingwave_stream::executor::ProjectSetExecutor; use risingwave_stream::task::ActorEvalErrorReport; @@ -60,6 +61,7 @@ fn create_executor() -> (MessageSender, BoxedMessageStream) { async fn test_project_set() { let (mut tx, mut project_set) = create_executor(); + tx.push_barrier(test_epoch(1), false); tx.push_chunk(StreamChunk::from_pretty( " I I + 1 4 @@ -76,6 +78,7 @@ async fn test_project_set() { check_until_pending( &mut project_set, expect_test::expect![[r#" + - !barrier 1 - !chunk |- +---+---+---+---+---+---+ | + | 0 | 5 | 2 | 1 | 2 |