diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index a07e4d236f4e1..0dc2fa9dac58a 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -453,6 +453,7 @@ where // If not finished then we need to update state, otherwise no need. if let Message::Barrier(barrier) = &msg { if is_completely_finished { + // If already finished, no need to persist any state. } else { // If snapshot was empty, we do not need to backfill, // but we still need to persist the finished state. @@ -492,20 +493,16 @@ where } } + tracing::trace!( + "Arrangement Backfill has already finished and forward messages directly to the downstream" + ); + // After progress finished + state persisted, // we can forward messages directly to the downstream, // as backfill is finished. #[for_await] for msg in upstream { if let Some(msg) = mapping_message(msg?, &self.output_indices) { - tracing::trace!( - actor = self.actor_id, - message = ?msg, - "backfill_finished_after_barrier" - ); - if let Message::Barrier(barrier) = &msg { - self.state_table.commit_no_data_expected(barrier.epoch); - } yield msg; } }