Skip to content

Commit

Permalink
feat: disallow online scaling for shared source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Sep 2, 2024
1 parent f65f733 commit 86486a6
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 35 deletions.
2 changes: 2 additions & 0 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ pub struct ScaleController {

pub env: MetaSrvEnv,

/// We will acquire lock during DDL to prevent scaling operations on jobs that are in the creating state.
/// e.g., a MV cannot be rescheduled during foreground backfill.
pub reschedule_lock: RwLock<()>,
}

Expand Down
23 changes: 12 additions & 11 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
yield Message::Barrier(barrier);
}
Message::Chunk(chunk) => {
// FIXME: consider SourceCatchingUp here?
yield Message::Chunk(chunk);
}
Message::Watermark(watermark) => {
Expand All @@ -716,19 +717,15 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {

/// All splits finished backfilling.
///
/// We check all splits for the source, including other actors' splits here, before going to the forward stage.
/// Otherwise if we break early, but after rescheduling, an unfinished split is migrated to
/// this actor, we still need to backfill it.
/// Note: we don't need to consider split migration (online scaling) here, so we can just check the splits assigned to this actor.
/// - For foreground DDL, scaling is not allowed during backfilling.
/// - For background DDL, scaling is skipped when backfilling is not finished, and can be triggered by recreating actors during recovery.
///
/// See <https://github.com/risingwavelabs/risingwave/issues/18300> for more details.
async fn backfill_finished(&self, states: &BackfillStates) -> StreamExecutorResult<bool> {
Ok(states
.values()
.all(|state| matches!(state, BackfillState::Finished))
&& self
.backfill_state_store
.scan()
.await?
.into_iter()
.all(|state| matches!(state, BackfillState::Finished)))
.all(|state| matches!(state, BackfillState::Finished)))
}

/// For newly added splits, we do not need to backfill and can directly forward from upstream.
Expand Down Expand Up @@ -787,7 +784,10 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
Some(backfill_state) => {
// Migrated split. Backfill if unfinished.
// TODO: disallow online scaling during backfilling.
debug_assert!(
false,
"split migration during backfill stage should not happen"
);
target_state.insert(split_id, backfill_state);
}
}
Expand Down Expand Up @@ -878,6 +878,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}
Some(backfill_state) => {
// Migrated split. It should also be finished since we are in forwarding stage.
// FIXME: it's still possible to have SourceCatchingUp here if we want to consider it as finished..?
match backfill_state {
BackfillState::Finished => {}
_ => {
Expand Down
24 changes: 0 additions & 24 deletions src/stream/src/executor/source/source_backfill_state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,6 @@ impl<S: StateStore> BackfillStateTableHandler<S> {
.map_err(StreamExecutorError::from)
}

/// XXX: we might get stale data for other actors' writes, but it's fine?
pub async fn scan(&self) -> StreamExecutorResult<Vec<BackfillState>> {
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);

let state_table_iter = self
.state_store
.iter_with_prefix(None::<OwnedRow>, sub_range, Default::default())
.await?;
pin_mut!(state_table_iter);

let mut ret = vec![];
while let Some(item) = state_table_iter.next().await {
let row = item?.into_owned_row();
let state = match row.datum_at(1) {
Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
BackfillState::restore_from_json(jsonb_ref.to_owned_scalar())?
}
_ => unreachable!(),
};
ret.push(state);
}
Ok(ret)
}

async fn set(&mut self, key: SplitId, state: BackfillState) -> StreamExecutorResult<()> {
let row = [
Some(Self::string_to_scalar(key.as_ref())),
Expand Down

0 comments on commit 86486a6

Please sign in to comment.