Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed May 13, 2023
1 parent 0f2e13f commit 99b7702
Showing 1 changed file with 10 additions and 14 deletions.
24 changes: 10 additions & 14 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,16 @@ where
// Directly finish the progress as the snapshot is empty.
self.progress.finish(first_barrier.epoch.curr);
// Persist state on barrier
if let Some(current_pos) = &current_pos {
let mut current_pos: Vec<Datum> = current_pos.as_inner().into();
let mut current_pos = Vec![None; pk_indices.len()];
current_pos.push(Some(false.into()));
let current_pos = OwnedRow::new(current_pos);
Self::flush_data(
&mut self.state_table,
barrier.epoch,
old_pos.as_ref(),
&current_pos,
)
.await?;
old_pos = Some(current_pos);
}
let mut current_pos = vec![None; pk_in_output_indices.len()];
current_pos.push(Some(false.into()));
let current_pos = OwnedRow::new(current_pos);
Self::flush_data(
&mut self.state_table,
first_barrier.epoch,
None,
&current_pos,
)
.await?;
}

// The first barrier message should be propagated.
Expand Down

0 comments on commit 99b7702

Please sign in to comment.