diff --git a/src/stream/src/executor/backfill.rs b/src/stream/src/executor/backfill.rs index 7c5da221ea1bc..5d5f5b7a65246 100644 --- a/src/stream/src/executor/backfill.rs +++ b/src/stream/src/executor/backfill.rs @@ -151,20 +151,16 @@ where // Directly finish the progress as the snapshot is empty. self.progress.finish(first_barrier.epoch.curr); // Persist state on barrier - if let Some(current_pos) = ¤t_pos { - let mut current_pos: Vec = current_pos.as_inner().into(); - let mut current_pos = Vec![None; pk_indices.len()]; - current_pos.push(Some(false.into())); - let current_pos = OwnedRow::new(current_pos); - Self::flush_data( - &mut self.state_table, - barrier.epoch, - old_pos.as_ref(), - ¤t_pos, - ) - .await?; - old_pos = Some(current_pos); - } + let mut current_pos = vec![None; pk_in_output_indices.len()]; + current_pos.push(Some(false.into())); + let current_pos = OwnedRow::new(current_pos); + Self::flush_data( + &mut self.state_table, + first_barrier.epoch, + None, + ¤t_pos, + ) + .await?; } // The first barrier message should be propagated.