Skip to content

Commit

Permalink
still call flush_current_epoch on barrier after pause
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Feb 26, 2024
1 parent 9969c10 commit 3689bc6
Showing 1 changed file with 3 additions and 6 deletions.
9 changes: 3 additions & 6 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,9 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
yield Message::Chunk(chunk);
}
Message::Barrier(barrier) => {
// No need to flush barrier to log writer when paused because there will be no data between pause barrier and resume barrier.
if !is_paused {
log_writer
.flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint())
.await?;
}
log_writer
.flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint())
.await?;

if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Expand Down

0 comments on commit 3689bc6

Please sign in to comment.