Skip to content

Commit

Permalink
finish split change
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Feb 19, 2024
1 parent 39b4ff6 commit 91fe348
Show file tree
Hide file tree
Showing 2 changed files with 246 additions and 68 deletions.
258 changes: 209 additions & 49 deletions src/stream/src/executor/source/kafka_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,22 @@
// limitations under the License.

use std::assert_matches::assert_matches;
use std::cell::RefCell;
use std::cmp::Ordering;
use std::fmt::Formatter;

use anyhow::anyhow;
use anyhow::{anyhow, bail};
use either::Either;
use futures::stream::{select_with_strategy, AbortHandle, Abortable, PollNext};
use futures::StreamExt;
use futures_async_stream::try_stream;
use risingwave_common::bail;
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_common::types::JsonbVal;
use risingwave_connector::source::cdc::split;
use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
use risingwave_connector::source::{
BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitMetaData,
BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitMetaData,
};
use risingwave_connector::ConnectorParams;
use risingwave_storage::StateStore;
Expand Down Expand Up @@ -262,16 +261,29 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {

let mut backfill_states: BackfillStates = HashMap::new();
let mut unfinished_splits = Vec::new();
for ele in boot_state {
let split_id = ele.id();
let (split, backfill_state) = self
for mut split in boot_state {
let split_id = split.id();
let backfill_state = self
.backfill_state_store
.try_recover_from_state_store(ele)
.try_recover_from_state_store(&split_id)
.await?;

backfill_states.insert(split_id, backfill_state);
if let Some(split) = split {
unfinished_splits.push(split);
match backfill_state {
None => {
backfill_states.insert(split_id, BackfillState::Backfilling(None));
unfinished_splits.push(split);
}
Some(backfill_state) => {
match backfill_state {
BackfillState::Backfilling(ref offset) => {
if let Some(offset) = offset {
split.update_in_place(offset.clone())?;
}
unfinished_splits.push(split);
}
BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
}
backfill_states.insert(split_id, backfill_state);
}
}
}
tracing::debug!(?backfill_states, "source backfill started");
Expand All @@ -286,29 +298,29 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {

yield Message::Barrier(barrier);

if !backfill_finished(&backfill_states) {
let (source_chunk_reader, abort_handles) = self
.build_stream_source_reader(&source_desc, unfinished_splits.clone())
.instrument_await("source_build_reader")
.await?;
let (source_chunk_reader, abort_handles) = self
.build_stream_source_reader(&source_desc, unfinished_splits.clone())
.instrument_await("source_build_reader")
.await?;

// XXX:
// - What's the best poll strategy?
// - Should we also add a barrier stream for backfill executor?
fn closure() -> impl FnMut(&mut ()) -> PollNext {
|_: &mut ()| futures::stream::PollNext::Left
}
let mut backfill_stream = Some(select_with_strategy(
input.by_ref().map(Either::Left),
source_chunk_reader.map(Either::Right),
closure(),
));
let mut backfill_stage = BackfillStage {
abort_handles,
states: backfill_states,
unfinished_splits,
};
// XXX:
// - What's the best poll strategy?
// - Should we also add a barrier stream for backfill executor?
fn closure() -> impl FnMut(&mut ()) -> PollNext {
|_: &mut ()| futures::stream::PollNext::Left
}
let mut backfill_stream = Some(select_with_strategy(
input.by_ref().map(Either::Left),
source_chunk_reader.map(Either::Right),
closure(),
));
let mut backfill_stage = BackfillStage {
abort_handles,
states: backfill_states,
unfinished_splits,
};

if !backfill_finished(&backfill_stage.states) {
'backfill_loop: while let Some(either) = backfill_stream.as_mut().unwrap().next().await
{
match either {
Expand All @@ -332,7 +344,6 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
actor_splits = ?actor_splits,
"source change split received"
);

split_changed = self
.apply_split_change(
actor_splits,
Expand Down Expand Up @@ -408,10 +419,18 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {

if backfill_finished(&backfill_stage.states) {
// all splits finished backfilling
self.backfill_state_store
.set_states(backfill_stage.states.clone())
.await?;
break 'backfill_loop;
// We check all splits for the source, including other actors' splits here, before going to the forward stage.
// Otherwise if we break early, but after rescheduling, an unfinished split is migrated to
// this actor, we still need to backfill it.
if self
.backfill_state_store
.scan()
.await?
.into_iter()
.all(|state| matches!(state, BackfillState::Finished))
{
break 'backfill_loop;
}
}
}
Message::Chunk(chunk) => {
Expand Down Expand Up @@ -472,15 +491,47 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
}
}

let mut splits: HashSet<SplitId> = backfill_stage.states.keys().cloned().collect();

// All splits finished backfilling. Now we only forward the source data.
#[for_await]
for msg in input {
let msg = msg?;
match msg {
Message::Barrier(barrier) => {
// TODO: split change

// We might need to persist its state. Is is possible that we need to backfill?
if let Some(ref mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause | Mutation::Resume => {
// We don't need to do anything. Handled by upstream.
}
Mutation::SourceChangeSplit(actor_splits) => {
tracing::info!(
actor_id = self.actor_ctx.id,
actor_splits = ?actor_splits,
"source change split received"
);
self.apply_split_change_forward_stage(
actor_splits,
&mut splits,
true,
)
.await?;
}
Mutation::Update(UpdateMutation { actor_splits, .. }) => {
self.apply_split_change_forward_stage(
actor_splits,
&mut splits,
true,
)
.await?;
}
_ => {}
}
self.backfill_state_store
.state_store
.commit(barrier.epoch)
.await?;
}
yield Message::Barrier(barrier);
}
Message::Chunk(chunk) => {
Expand All @@ -494,7 +545,7 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
}

/// For newly added splits, we do not need to backfill and can directly forward from upstream.
async fn apply_split_change<'input>(
async fn apply_split_change(
&mut self,
split_assignment: &HashMap<ActorId, Vec<SplitImpl>>,
stage: &mut BackfillStage,
Expand Down Expand Up @@ -531,20 +582,34 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
let mut split_changed = false;

// Checks added splits
for (split_id, split) in target_splits {
for (split_id, mut split) in target_splits {
if let Some(s) = stage.states.get(&split_id) {
target_state.insert(split_id, s.clone());
} else {
split_changed = true;

let (split, backfill_state) = self
let backfill_state = self
.backfill_state_store
.try_recover_from_state_store(split)
.try_recover_from_state_store(&split_id)
.await?;

target_state.insert(split_id, backfill_state);
if let Some(split) = split {
stage.unfinished_splits.push(split);
match backfill_state {
None => {
// Newly added split. We don't need to backfill.
target_state.insert(split_id, BackfillState::Finished);
}
Some(backfill_state) => {
// Migrated split. Backfill if unfinished.
match backfill_state {
BackfillState::Backfilling(ref offset) => {
if let Some(offset) = offset {
split.update_in_place(offset.clone())?;
}
stage.unfinished_splits.push(split);
}
BackfillState::SourceCachingUp(_) | BackfillState::Finished => {}
}
target_state.insert(split_id, backfill_state);
}
}
}
}
Expand Down Expand Up @@ -583,6 +648,101 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {

Ok(split_changed)
}

/// For split change during forward stage, all newly added splits should be already finished.
// We just need to update the state store if necessary.
async fn apply_split_change_forward_stage(
&mut self,
split_assignment: &HashMap<ActorId, Vec<SplitImpl>>,
splits: &mut HashSet<SplitId>,
should_trim_state: bool,
) -> StreamExecutorResult<()> {
if let Some(target_splits) = split_assignment.get(&self.actor_ctx.id).cloned() {
self.update_state_if_changed_forward_stage(target_splits, splits, should_trim_state)
.await?;
}

Ok(())
}

async fn update_state_if_changed_forward_stage(
&mut self,
target_splits: Vec<SplitImpl>,
current_splits: &mut HashSet<SplitId>,
should_trim_state: bool,
) -> StreamExecutorResult<()> {
let target_splits: HashSet<SplitId> = target_splits
.into_iter()
.map(|split| (split.id()))
.collect();

let mut split_changed = false;

// Checks added splits
for split_id in &target_splits {
if !current_splits.contains(split_id) {
split_changed = true;

let backfill_state = self
.backfill_state_store
.try_recover_from_state_store(&split_id)
.await?;
match backfill_state {
None => {
// Newly added split. We don't need to backfill!
}
Some(backfill_state) => {
// Migrated split. It should also be finished since we are in forwarding stage.
match backfill_state {
BackfillState::Finished => {}
_ => {
return Err(anyhow::anyhow!(
"Unexpected backfill state: {:?}",
backfill_state
)
.into());
}
}
}
}
}
}

// Checks dropped splits
for existing_split_id in current_splits.iter() {
if !target_splits.contains(existing_split_id) {
tracing::info!("split dropping detected: {}", existing_split_id);
split_changed = true;
}
}

if split_changed {
tracing::info!(
actor_id = self.actor_ctx.id,
target_splits = ?target_splits,
"apply split change"
);

let dropped_splits =
current_splits.extract_if(|split_id| target_splits.get(split_id).is_none());

if should_trim_state {
// trim dropped splits' state
self.backfill_state_store.trim_state(dropped_splits).await?;
}

self.backfill_state_store
.set_states(
target_splits
.into_iter()
.map(|split_id| (split_id, BackfillState::Finished))
.collect(),
)
.await?;
}

Ok(())
}
}

fn compare_kafka_offset(a: Option<&String>, b: &str) -> Ordering {
Expand Down
Loading

0 comments on commit 91fe348

Please sign in to comment.