diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 39a458b28ff47..bfd7058b97a57 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -573,7 +573,10 @@ impl SourceBackfillExecutorInner { } let mut splits: HashSet = backfill_stage.states.keys().cloned().collect(); - + tracing::info!( + actor_id = self.actor_ctx.id, + "source backfill finished. Enter forward stage" + ); // All splits finished backfilling. Now we only forward the source data. #[for_await] for msg in input { @@ -630,13 +633,19 @@ impl SourceBackfillExecutorInner { /// Otherwise if we break early, but after rescheduling, an unfinished split is migrated to /// this actor, we still need to backfill it. async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult { + let persisted_states = self.backfill_state_store.scan().await?; + let actor_id = self.actor_ctx.id; + tracing::debug!( + actor_id, + "checking whether source backfill is finished, persisted_states: {:?}, states: {:?}", + persisted_states, + states + ); Ok(states .values() .all(|state| matches!(state, BackfillState::Finished)) - && self - .backfill_state_store - .scan() - .await? + && !persisted_states.is_empty() + && persisted_states .into_iter() .all(|state| matches!(state, BackfillState::Finished))) } @@ -780,8 +789,10 @@ impl SourceBackfillExecutorInner { BackfillState::Finished => {} _ => { return Err(anyhow::anyhow!( - "Unexpected backfill state: {:?}", - backfill_state + "Unexpected backfill state in update_state_if_changed_forward_stage: {:?}, target_splits: {:?}, current_splits: {:?}", + backfill_state, + target_splits, + current_splits ) .into()); } diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 1531dca93b909..2077f0d4f2309 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -478,6 +478,8 @@ impl SourceExecutor { // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. // It's highly probable that the work of scanning historical data cannot be shared, // so don't waste work on it. + // Note that this is only a performance optimization. + // For correctness, the shared SourceExecutor can start from anywhere. // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 if ele.is_cdc_split() { // shared CDC source already starts from latest. @@ -487,7 +489,7 @@ impl SourceExecutor { SplitImpl::Kafka(split) => { split.seek_to_latest_offset(); } - _ => unreachable!("only kafka source can be shared, got {:?}", ele), + _ => unreachable!("got a non-shareable connector: {:?}", ele), } } }