diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index d10fa83710d8..8e9b5be69043 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -441,6 +441,8 @@ pub struct ScaleController { pub env: MetaSrvEnv, + /// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state. + /// e.g., a MV cannot be rescheduled during foreground backfill. pub reschedule_lock: RwLock<()>, } diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index c5f01b645ac5..bc224781aad6 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -705,6 +705,7 @@ impl SourceBackfillExecutorInner { yield Message::Barrier(barrier); } Message::Chunk(chunk) => { + // FIXME: consider SourceCatchingUp here? yield Message::Chunk(chunk); } Message::Watermark(watermark) => { @@ -716,19 +717,15 @@ impl SourceBackfillExecutorInner { /// All splits finished backfilling. /// - /// We check all splits for the source, including other actors' splits here, before going to the forward stage. - /// Otherwise if we break early, but after rescheduling, an unfinished split is migrated to - /// this actor, we still need to backfill it. + /// Note: we don't need to consider split migration (online scaling) here, so we can just check the splits assigned to this actor. + /// - For foreground DDL, scaling is not allowed during backfilling. + /// - For background DDL, scaling is skipped when backfilling is not finished, and can be triggered by recreating actors during recovery. + /// + /// See for more details. async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult { Ok(states .values() - .all(|state| matches!(state, BackfillState::Finished)) - && self - .backfill_state_store - .scan() - .await? - .into_iter() - .all(|state| matches!(state, BackfillState::Finished))) + .all(|state| matches!(state, BackfillState::Finished))) } /// For newly added splits, we do not need to backfill and can directly forward from upstream. @@ -787,7 +784,10 @@ impl SourceBackfillExecutorInner { } Some(backfill_state) => { // Migrated split. Backfill if unfinished. - // TODO: disallow online scaling during backfilling. + debug_assert!( + false, + "split migration during backfill stage should not happen" + ); target_state.insert(split_id, backfill_state); } } @@ -878,6 +878,7 @@ impl SourceBackfillExecutorInner { } Some(backfill_state) => { // Migrated split. It should also be finished since we are in forwarding stage. + // FIXME: it's still possible to have SourceCatchingUp here if we want to consider it as finished..? match backfill_state { BackfillState::Finished => {} _ => { diff --git a/src/stream/src/executor/source/source_backfill_state_table.rs b/src/stream/src/executor/source/source_backfill_state_table.rs index be9abe8490e6..c4ddfbed36fe 100644 --- a/src/stream/src/executor/source/source_backfill_state_table.rs +++ b/src/stream/src/executor/source/source_backfill_state_table.rs @@ -55,30 +55,6 @@ impl BackfillStateTableHandler { .map_err(StreamExecutorError::from) } - /// XXX: we might get stale data for other actors' writes, but it's fine? - pub async fn scan(&self) -> StreamExecutorResult> { - let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); - - let state_table_iter = self - .state_store - .iter_with_prefix(None::, sub_range, Default::default()) - .await?; - pin_mut!(state_table_iter); - - let mut ret = vec![]; - while let Some(item) = state_table_iter.next().await { - let row = item?.into_owned_row(); - let state = match row.datum_at(1) { - Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { - BackfillState::restore_from_json(jsonb_ref.to_owned_scalar())? - } - _ => unreachable!(), - }; - ret.push(state); - } - Ok(ret) - } - async fn set(&mut self, key: SplitId, state: BackfillState) -> StreamExecutorResult<()> { let row = [ Some(Self::string_to_scalar(key.as_ref())),