Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify SourceBackfill's memory state #18296

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 45 additions & 67 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::executor::prelude::*;
use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
use crate::executor::{AddMutation, UpdateMutation};

#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub enum BackfillState {
/// `None` means not started yet. It's the initial state.
Backfilling(Option<String>),
Expand Down Expand Up @@ -138,30 +138,32 @@ pub struct SourceBackfillExecutorInner<S: StateStore> {
}

/// Local variables used in the backfill stage.
#[derive(Debug)]
struct BackfillStage {
// stream: Option<EitherStream<'a>>,
states: BackfillStates,
/// Note: the offsets are not updated. Should use `state`'s offset to update before using it.
unfinished_splits: Vec<SplitImpl>,
/// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
///
/// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
splits: Vec<SplitImpl>,
}

impl BackfillStage {
/// Get unfinished splits with latest offsets according to the backfill states.
fn get_latest_unfinished_splits(&mut self) -> StreamExecutorResult<&Vec<SplitImpl>> {
fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
let mut unfinished_splits = Vec::new();
for split in &mut self.unfinished_splits {
for split in &self.splits {
let state = self.states.get(split.id().as_ref()).unwrap();
match state {
BackfillState::Backfilling(Some(offset)) => {
split.update_in_place(offset.clone())?;
unfinished_splits.push(split.clone());
let mut updated_split = split.clone();
updated_split.update_in_place(offset.clone())?;
unfinished_splits.push(updated_split);
}
BackfillState::Backfilling(None) => unfinished_splits.push(split.clone()),
BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
}
}
self.unfinished_splits = unfinished_splits;
Ok(&self.unfinished_splits)
Ok(unfinished_splits)
}
}

Expand Down Expand Up @@ -262,39 +264,30 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
self.backfill_state_store.init_epoch(barrier.epoch);

let mut backfill_states: BackfillStates = HashMap::new();
let mut unfinished_splits = Vec::new();
for mut split in owned_splits {

for split in &owned_splits {
let split_id = split.id();
let backfill_state = self
.backfill_state_store
.try_recover_from_state_store(&split_id)
.await?;
match backfill_state {
None => {
backfill_states.insert(split_id, BackfillState::Backfilling(None));
unfinished_splits.push(split);
}
Some(backfill_state) => {
match backfill_state {
BackfillState::Backfilling(ref offset) => {
if let Some(offset) = offset {
split.update_in_place(offset.clone())?;
}
unfinished_splits.push(split);
}
BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
}
backfill_states.insert(split_id, backfill_state);
}
}
.await?
.unwrap_or(BackfillState::Backfilling(None));
backfill_states.insert(split_id, backfill_state);
}
tracing::debug!(?backfill_states, "source backfill started");
let mut backfill_stage = BackfillStage {
states: backfill_states,
splits: owned_splits,
};
tracing::debug!(?backfill_stage, "source backfill started");

// Return the ownership of `stream_source_core` to the source executor.
self.stream_source_core = core;

let source_chunk_reader = self
.build_stream_source_reader(&source_desc, unfinished_splits.clone())
.build_stream_source_reader(
&source_desc,
backfill_stage.get_latest_unfinished_splits()?,
)
.instrument_await("source_build_reader")
.await?;

Expand Down Expand Up @@ -336,11 +329,6 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {

yield Message::Barrier(barrier);

let mut backfill_stage = BackfillStage {
states: backfill_states,
unfinished_splits,
};

if !self.backfill_finished(&backfill_stage.states).await? {
let source_backfill_row_count = self
.metrics
Expand Down Expand Up @@ -382,7 +370,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
let reader = self
.build_stream_source_reader(
&source_desc,
backfill_stage.get_latest_unfinished_splits()?.clone(),
backfill_stage.get_latest_unfinished_splits()?,
)
.await?;

Expand Down Expand Up @@ -464,7 +452,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
let reader = self
.build_stream_source_reader(
&source_desc,
latest_unfinished_splits.clone(),
latest_unfinished_splits,
)
.await?;

Expand Down Expand Up @@ -681,19 +669,18 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
stage: &mut BackfillStage,
should_trim_state: bool,
) -> StreamExecutorResult<bool> {
let target_splits: HashMap<_, _> = target_splits
.into_iter()
.map(|split| (split.id(), split))
.collect();

let mut target_state: HashMap<SplitId, BackfillState> =
HashMap::with_capacity(target_splits.len());
let mut target_state: BackfillStates = HashMap::with_capacity(target_splits.len());

let mut split_changed = false;

// Checks added splits
for (split_id, mut split) in target_splits {
if let Some(s) = stage.states.get(&split_id) {
// Take out old states (immutable, only used to build target_state and check for added/dropped splits).
// Will be set to target_state in the end.
let old_states = std::mem::take(&mut stage.states);
// Iterate over the target (assigned) splits
// - check if any new splits are added
// - build target_state
for split in target_splits {
let split_id = split.id();
if let Some(s) = old_states.get(&split_id) {
target_state.insert(split_id, s.clone());
} else {
split_changed = true;
Expand All @@ -705,38 +692,27 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
match backfill_state {
None => {
// Newly added split. We don't need to backfill.
// Note that this branch is different from the initial barrier (BackfillState::Backfilling(None) there).
target_state.insert(split_id, BackfillState::Finished);
}
Some(backfill_state) => {
// Migrated split. Backfill if unfinished.
match backfill_state {
BackfillState::Backfilling(ref offset) => {
if let Some(offset) = offset {
split.update_in_place(offset.clone())?;
}
stage.unfinished_splits.push(split);
}
BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
}
// TODO: disallow online scaling during backfilling.
target_state.insert(split_id, backfill_state);
}
}
}
}

// Checks dropped splits
for existing_split_id in stage.states.keys() {
for existing_split_id in old_states.keys() {
if !target_state.contains_key(existing_split_id) {
tracing::info!("split dropping detected: {}", existing_split_id);
split_changed = true;
}
}

if split_changed {
stage
.unfinished_splits
.retain(|split| target_state.contains_key(split.id().as_ref()));

let dropped_splits = stage
.states
.extract_if(|split_id, _| !target_state.contains_key(split_id))
Expand All @@ -746,9 +722,11 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
// trim dropped splits' state
self.backfill_state_store.trim_state(dropped_splits).await?;
}
tracing::info!(old_state=?stage.states, new_state=?target_state, "finish split change");
stage.states = target_state;
tracing::info!(old_state=?old_states, new_state=?target_state, "finish split change");
} else {
debug_assert_eq!(old_states, target_state);
}
stage.states = target_state;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems I forget to add stage.splits = target_splits here. 🤡


Ok(split_changed)
}
Expand Down
Loading