Skip to content

Commit

Permalink
fix rebuild from error
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Mar 4, 2024
1 parent f2d3a9a commit 4c27f22
Showing 1 changed file with 25 additions and 18 deletions.
43 changes: 25 additions & 18 deletions src/stream/src/executor/source/kafka_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,26 @@ struct BackfillStage {
unfinished_splits: Vec<SplitImpl>,
}

impl BackfillStage {
/// Get unfinished splits with latest offsets according to the backfill states.
fn get_latest_unfinished_splits(&mut self) -> StreamExecutorResult<&Vec<SplitImpl>> {
let mut unfinished_splits = Vec::new();
for split in &mut self.unfinished_splits {
let state = self.states.get(split.id().as_ref()).unwrap();
match state {
BackfillState::Backfilling(Some(offset)) => {
split.update_in_place(offset.clone())?;
unfinished_splits.push(split.clone());
}
BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
}
}
self.unfinished_splits = unfinished_splits;
Ok(&self.unfinished_splits)
}
}

impl<S: StateStore> KafkaBackfillExecutorInner<S> {
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down Expand Up @@ -360,7 +380,7 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
let reader = self
.build_stream_source_reader(
&source_desc,
backfill_stage.unfinished_splits.clone(),
backfill_stage.get_latest_unfinished_splits()?.clone(),
)
.await?;

Expand Down Expand Up @@ -430,33 +450,20 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
if split_changed {
// rebuild backfill_stream
// Note: we don't put this part in a method, due to some complex lifetime issues.
let mut unfinished_splits = Vec::new();
for split in &mut backfill_stage.unfinished_splits {
let state =
backfill_stage.states.get(split.id().as_ref()).unwrap();
match state {
BackfillState::Backfilling(Some(offset)) => {
split.update_in_place(offset.clone())?;
unfinished_splits.push(split.clone());
}
BackfillState::Backfilling(None)
| BackfillState::SourceCachingUp(_)
| BackfillState::Finished => {}
}
}
backfill_stage.unfinished_splits = unfinished_splits;

let latest_unfinished_splits =
backfill_stage.get_latest_unfinished_splits()?;
tracing::info!(
"actor {:?} apply source split change to {:?}",
self.actor_ctx.id,
backfill_stage.unfinished_splits
latest_unfinished_splits
);

// Replace the source reader with a new one of the new state.
let reader = self
.build_stream_source_reader(
&source_desc,
backfill_stage.unfinished_splits.clone(),
latest_unfinished_splits.clone(),
)
.await?;

Expand Down

0 comments on commit 4c27f22

Please sign in to comment.