From 0e844fe43bc1602fe05b77b1962a259f8bdba7d5 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Wed, 30 Oct 2024 09:51:04 +0800 Subject: [PATCH] fix(source): pause source correctly (#19148) --- .../src/executor/source/fs_source_executor.rs | 17 ++++-- .../source/source_backfill_executor.rs | 53 ++++++++++++++----- .../src/executor/source/source_executor.rs | 18 +++++-- 3 files changed, 67 insertions(+), 21 deletions(-) diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 70f0ce5f4f24..0f3115c46c62 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -364,8 +364,10 @@ impl FsSourceExecutor { let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); + let mut command_paused = false; if start_with_paused { stream.pause_stream(); + command_paused = true; } yield Message::Barrier(barrier); @@ -394,7 +396,10 @@ impl FsSourceExecutor { Message::Barrier(barrier) => { last_barrier_time = Instant::now(); if self_paused { - stream.resume_stream(); + // command_paused has a higher priority. + if !command_paused { + stream.resume_stream(); + } self_paused = false; } let epoch = barrier.epoch; @@ -405,8 +410,14 @@ impl FsSourceExecutor { self.apply_split_change(&source_desc, &mut stream, actor_splits) .await? } - Mutation::Pause => stream.pause_stream(), - Mutation::Resume => stream.resume_stream(), + Mutation::Pause => { + command_paused = true; + stream.pause_stream() + } + Mutation::Resume => { + command_paused = false; + stream.resume_stream() + } Mutation::Update(UpdateMutation { actor_splits, .. }) => { self.apply_split_change( &source_desc, diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 34f9eb12d692..55806150bb87 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -406,6 +406,7 @@ impl SourceBackfillExecutorInner { type PausedReader = Option; let mut paused_reader: PausedReader = None; + let mut command_paused = false; macro_rules! pause_reader { () => { @@ -422,6 +423,7 @@ impl SourceBackfillExecutorInner { // If the first barrier requires us to pause on startup, pause the stream. if barrier.is_pause_on_startup() { + command_paused = true; pause_reader!(); } @@ -503,11 +505,16 @@ impl SourceBackfillExecutorInner { last_barrier_time = Instant::now(); if self_paused { - backfill_stream = select_with_strategy( - input.by_ref().map(Either::Left), - paused_reader.take().expect("no paused reader to resume"), - select_strategy, - ); + // command_paused has a higher priority. + if !command_paused { + backfill_stream = select_with_strategy( + input.by_ref().map(Either::Left), + paused_reader + .take() + .expect("no paused reader to resume"), + select_strategy, + ); + } self_paused = false; } @@ -515,16 +522,28 @@ impl SourceBackfillExecutorInner { if let Some(ref mutation) = barrier.mutation.as_deref() { match mutation { Mutation::Pause => { - pause_reader!(); + // pause_reader should not be invoked consecutively more than once. + if !command_paused { + pause_reader!(); + command_paused = true; + } else { + tracing::warn!(command_paused, "unexpected pause"); + } } Mutation::Resume => { - backfill_stream = select_with_strategy( - input.by_ref().map(Either::Left), - paused_reader - .take() - .expect("no paused reader to resume"), - select_strategy, - ); + // pause_reader.take should not be invoked consecutively more than once. + if command_paused { + backfill_stream = select_with_strategy( + input.by_ref().map(Either::Left), + paused_reader + .take() + .expect("no paused reader to resume"), + select_strategy, + ); + command_paused = false; + } else { + tracing::warn!(command_paused, "unexpected resume"); + } } Mutation::SourceChangeSplit(actor_splits) => { tracing::info!( @@ -641,6 +660,13 @@ impl SourceBackfillExecutorInner { let chunk = msg?; if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms { + assert!(!command_paused, "command_paused should be false"); + // pause_reader should not be invoked consecutively more than once. + if !self_paused { + pause_reader!(); + } else { + tracing::warn!(self_paused, "unexpected self pause"); + } // Exceeds the max wait barrier time, the source will be paused. // Currently we can guarantee the // source is not paused since it received stream @@ -651,7 +677,6 @@ impl SourceBackfillExecutorInner { self.info.identity, last_barrier_time.elapsed() ); - pause_reader!(); // Only update `max_wait_barrier_time_ms` to capture // `barrier_interval_ms` diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 6d5cf710d3bb..6e583caf739a 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -503,6 +503,7 @@ impl SourceExecutor { let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); + let mut command_paused = false; // - For shared source, pause until there's a MV. // - If the first barrier requires us to pause on startup, pause the stream. @@ -513,6 +514,7 @@ impl SourceExecutor { "source paused on startup" ); stream.pause_stream(); + command_paused = true; } yield Message::Barrier(barrier); @@ -548,8 +550,11 @@ impl SourceExecutor { last_barrier_time = Instant::now(); if self_paused { - stream.resume_stream(); self_paused = false; + // command_paused has a higher priority. + if !command_paused { + stream.resume_stream(); + } } let epoch = barrier.epoch; @@ -564,9 +569,14 @@ impl SourceExecutor { if let Some(mutation) = barrier.mutation.as_deref() { match mutation { - // XXX: Is it possible that the stream is self_paused, and we have pause mutation now? In this case, it will panic. - Mutation::Pause => stream.pause_stream(), - Mutation::Resume => stream.resume_stream(), + Mutation::Pause => { + command_paused = true; + stream.pause_stream() + } + Mutation::Resume => { + command_paused = false; + stream.resume_stream() + } Mutation::SourceChangeSplit(actor_splits) => { tracing::info!( actor_id = self.actor_ctx.id,