Skip to content

Commit

Permalink
minor update
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Sep 3, 2024
1 parent 31d65f3 commit 1a663a9
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,15 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}

let mut splits: HashSet<SplitId> = backfill_stage.states.keys().cloned().collect();
// Make sure `Finished` state is persisted.
self.backfill_state_store
.set_states(
splits
.iter()
.map(|s| (s.clone(), BackfillState::Finished))
.collect(),
)
.await?;

// All splits finished backfilling. Now we only forward the source data.
#[for_await]
Expand Down Expand Up @@ -663,14 +672,6 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
_ => {}
}
}
self.backfill_state_store
.set_states(
splits
.iter()
.map(|s| (s.clone(), BackfillState::Finished))
.collect(),
)
.await?;
self.backfill_state_store
.state_store
.commit(barrier.epoch)
Expand Down

0 comments on commit 1a663a9

Please sign in to comment.