From 4c27f22757208c32f5bd4adae89fd38efbfea3fd Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 4 Mar 2024 17:33:54 +0800 Subject: [PATCH] fix rebuild from error --- .../source/kafka_backfill_executor.rs | 43 +++++++++++-------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/src/stream/src/executor/source/kafka_backfill_executor.rs b/src/stream/src/executor/source/kafka_backfill_executor.rs index 24c1cc2bf7573..a32396b6bd6c1 100644 --- a/src/stream/src/executor/source/kafka_backfill_executor.rs +++ b/src/stream/src/executor/source/kafka_backfill_executor.rs @@ -149,6 +149,26 @@ struct BackfillStage { unfinished_splits: Vec, } +impl BackfillStage { + /// Get unfinished splits with latest offsets according to the backfill states. + fn get_latest_unfinished_splits(&mut self) -> StreamExecutorResult<&Vec> { + let mut unfinished_splits = Vec::new(); + for split in &mut self.unfinished_splits { + let state = self.states.get(split.id().as_ref()).unwrap(); + match state { + BackfillState::Backfilling(Some(offset)) => { + split.update_in_place(offset.clone())?; + unfinished_splits.push(split.clone()); + } + BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()), + BackfillState::SourceCachingUp(_) | BackfillState::Finished => {} + } + } + self.unfinished_splits = unfinished_splits; + Ok(&self.unfinished_splits) + } +} + impl KafkaBackfillExecutorInner { #[allow(clippy::too_many_arguments)] pub fn new( @@ -360,7 +380,7 @@ impl KafkaBackfillExecutorInner { let reader = self .build_stream_source_reader( &source_desc, - backfill_stage.unfinished_splits.clone(), + backfill_stage.get_latest_unfinished_splits()?.clone(), ) .await?; @@ -430,33 +450,20 @@ impl KafkaBackfillExecutorInner { if split_changed { // rebuild backfill_stream // Note: we don't put this part in a method, due to some complex lifetime issues. - let mut unfinished_splits = Vec::new(); - for split in &mut backfill_stage.unfinished_splits { - let state = - backfill_stage.states.get(split.id().as_ref()).unwrap(); - match state { - BackfillState::Backfilling(Some(offset)) => { - split.update_in_place(offset.clone())?; - unfinished_splits.push(split.clone()); - } - BackfillState::Backfilling(None) - | BackfillState::SourceCachingUp(_) - | BackfillState::Finished => {} - } - } - backfill_stage.unfinished_splits = unfinished_splits; + let latest_unfinished_splits = + backfill_stage.get_latest_unfinished_splits()?; tracing::info!( "actor {:?} apply source split change to {:?}", self.actor_ctx.id, - backfill_stage.unfinished_splits + latest_unfinished_splits ); // Replace the source reader with a new one of the new state. let reader = self .build_stream_source_reader( &source_desc, - backfill_stage.unfinished_splits.clone(), + latest_unfinished_splits.clone(), ) .await?;