From 3689bc6a8aa46a8984924f9b7fb39a77624597e2 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Mon, 26 Feb 2024 21:05:06 +0800 Subject: [PATCH] still call flush_current_epoch on barrier after pause --- src/stream/src/executor/sink.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index fdc2bab87b397..3b90d656cae2b 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -216,12 +216,9 @@ impl SinkExecutor { yield Message::Chunk(chunk); } Message::Barrier(barrier) => { - // No need to flush barrier to log writer when paused because there will be no data between pause barrier and resume barrier. - if !is_paused { - log_writer - .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) - .await?; - } + log_writer + .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) + .await?; if let Some(mutation) = barrier.mutation.as_deref() { match mutation {