Skip to content

Commit

Permalink
fix(source): pause source correctly (#19148)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Oct 30, 2024
1 parent b6051d8 commit 0e844fe
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 21 deletions.
17 changes: 14 additions & 3 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,10 @@ impl<S: StateStore> FsSourceExecutor<S> {
let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
let mut stream =
StreamReaderWithPause::<true, StreamChunk>::new(barrier_stream, source_chunk_reader);
let mut command_paused = false;
if start_with_paused {
stream.pause_stream();
command_paused = true;
}

yield Message::Barrier(barrier);
Expand Down Expand Up @@ -394,7 +396,10 @@ impl<S: StateStore> FsSourceExecutor<S> {
Message::Barrier(barrier) => {
last_barrier_time = Instant::now();
if self_paused {
stream.resume_stream();
// command_paused has a higher priority.
if !command_paused {
stream.resume_stream();
}
self_paused = false;
}
let epoch = barrier.epoch;
Expand All @@ -405,8 +410,14 @@ impl<S: StateStore> FsSourceExecutor<S> {
self.apply_split_change(&source_desc, &mut stream, actor_splits)
.await?
}
Mutation::Pause => stream.pause_stream(),
Mutation::Resume => stream.resume_stream(),
Mutation::Pause => {
command_paused = true;
stream.pause_stream()
}
Mutation::Resume => {
command_paused = false;
stream.resume_stream()
}
Mutation::Update(UpdateMutation { actor_splits, .. }) => {
self.apply_split_change(
&source_desc,
Expand Down
53 changes: 39 additions & 14 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {

type PausedReader = Option<impl Stream>;
let mut paused_reader: PausedReader = None;
let mut command_paused = false;

macro_rules! pause_reader {
() => {
Expand All @@ -422,6 +423,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {

// If the first barrier requires us to pause on startup, pause the stream.
if barrier.is_pause_on_startup() {
command_paused = true;
pause_reader!();
}

Expand Down Expand Up @@ -503,28 +505,45 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
last_barrier_time = Instant::now();

if self_paused {
backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
paused_reader.take().expect("no paused reader to resume"),
select_strategy,
);
// command_paused has a higher priority.
if !command_paused {
backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
paused_reader
.take()
.expect("no paused reader to resume"),
select_strategy,
);
}
self_paused = false;
}

let mut split_changed = false;
if let Some(ref mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => {
pause_reader!();
// pause_reader should not be invoked consecutively more than once.
if !command_paused {
pause_reader!();
command_paused = true;
} else {
tracing::warn!(command_paused, "unexpected pause");
}
}
Mutation::Resume => {
backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
paused_reader
.take()
.expect("no paused reader to resume"),
select_strategy,
);
// pause_reader.take should not be invoked consecutively more than once.
if command_paused {
backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
paused_reader
.take()
.expect("no paused reader to resume"),
select_strategy,
);
command_paused = false;
} else {
tracing::warn!(command_paused, "unexpected resume");
}
}
Mutation::SourceChangeSplit(actor_splits) => {
tracing::info!(
Expand Down Expand Up @@ -641,6 +660,13 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
let chunk = msg?;

if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
assert!(!command_paused, "command_paused should be false");
// pause_reader should not be invoked consecutively more than once.
if !self_paused {
pause_reader!();
} else {
tracing::warn!(self_paused, "unexpected self pause");
}
// Exceeds the max wait barrier time, the source will be paused.
// Currently we can guarantee the
// source is not paused since it received stream
Expand All @@ -651,7 +677,6 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
self.info.identity,
last_barrier_time.elapsed()
);
pause_reader!();

// Only update `max_wait_barrier_time_ms` to capture
// `barrier_interval_ms`
Expand Down
18 changes: 14 additions & 4 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ impl<S: StateStore> SourceExecutor<S> {
let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
let mut stream =
StreamReaderWithPause::<true, StreamChunk>::new(barrier_stream, source_chunk_reader);
let mut command_paused = false;

// - For shared source, pause until there's a MV.
// - If the first barrier requires us to pause on startup, pause the stream.
Expand All @@ -513,6 +514,7 @@ impl<S: StateStore> SourceExecutor<S> {
"source paused on startup"
);
stream.pause_stream();
command_paused = true;
}

yield Message::Barrier(barrier);
Expand Down Expand Up @@ -548,8 +550,11 @@ impl<S: StateStore> SourceExecutor<S> {
last_barrier_time = Instant::now();

if self_paused {
stream.resume_stream();
self_paused = false;
// command_paused has a higher priority.
if !command_paused {
stream.resume_stream();
}
}

let epoch = barrier.epoch;
Expand All @@ -564,9 +569,14 @@ impl<S: StateStore> SourceExecutor<S> {

if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
// XXX: Is it possible that the stream is self_paused, and we have pause mutation now? In this case, it will panic.
Mutation::Pause => stream.pause_stream(),
Mutation::Resume => stream.resume_stream(),
Mutation::Pause => {
command_paused = true;
stream.pause_stream()
}
Mutation::Resume => {
command_paused = false;
stream.resume_stream()
}
Mutation::SourceChangeSplit(actor_splits) => {
tracing::info!(
actor_id = self.actor_ctx.id,
Expand Down

0 comments on commit 0e844fe

Please sign in to comment.