Skip to content

Commit

Permalink
Revert "fix executor bug"
Browse files Browse the repository at this point in the history
This reverts commit 15d8287.
  • Loading branch information
xxchan committed Sep 2, 2024
1 parent 15d8287 commit 8f0f545
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 21 deletions.
25 changes: 7 additions & 18 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}

let mut splits: HashSet<SplitId> = backfill_stage.states.keys().cloned().collect();
tracing::info!(
actor_id = self.actor_ctx.id,
"source backfill finished. Enter forward stage"
);

// All splits finished backfilling. Now we only forward the source data.
#[for_await]
for msg in input {
Expand Down Expand Up @@ -633,19 +630,13 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
/// Otherwise if we break early, but after rescheduling, an unfinished split is migrated to
/// this actor, we still need to backfill it.
async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
let persisted_states = self.backfill_state_store.scan().await?;
let actor_id = self.actor_ctx.id;
tracing::debug!(
actor_id,
"checking whether source backfill is finished, persisted_states: {:?}, states: {:?}",
persisted_states,
states
);
Ok(states
.values()
.all(|state| matches!(state, BackfillState::Finished))
&& !persisted_states.is_empty()
&& persisted_states
&& self
.backfill_state_store
.scan()
.await?
.into_iter()
.all(|state| matches!(state, BackfillState::Finished)))
}
Expand Down Expand Up @@ -789,10 +780,8 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
BackfillState::Finished => {}
_ => {
return Err(anyhow::anyhow!(
"Unexpected backfill state in update_state_if_changed_forward_stage: {:?}, target_splits: {:?}, current_splits: {:?}",
backfill_state,
target_splits,
current_splits
"Unexpected backfill state: {:?}",
backfill_state
)
.into());
}
Expand Down
4 changes: 1 addition & 3 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,6 @@ impl<S: StateStore> SourceExecutor<S> {
// For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data.
// It's highly probable that the work of scanning historical data cannot be shared,
// so don't waste work on it.
// Note that this is only a performance optimization.
// For correctness, the shared SourceExecutor can start from anywhere.
// For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297
if ele.is_cdc_split() {
// shared CDC source already starts from latest.
Expand All @@ -489,7 +487,7 @@ impl<S: StateStore> SourceExecutor<S> {
SplitImpl::Kafka(split) => {
split.seek_to_latest_offset();
}
_ => unreachable!("got a non-shareable connector: {:?}", ele),
_ => unreachable!("only kafka source can be shared, got {:?}", ele),
}
}
}
Expand Down

0 comments on commit 8f0f545

Please sign in to comment.