From 2bb3ae49cd892302ca65e9cc0a29355f50abb39c Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 19 Feb 2024 01:05:19 +0800 Subject: [PATCH] WIP: split change --- .../source/kafka_backfill_executor.rs | 351 +++++++++--------- .../source/kafka_backfill_state_table.rs | 66 ++-- src/stream/src/from_proto/source_backfill.rs | 4 +- 3 files changed, 222 insertions(+), 199 deletions(-) diff --git a/src/stream/src/executor/source/kafka_backfill_executor.rs b/src/stream/src/executor/source/kafka_backfill_executor.rs index 0bab243cdd728..325c0f40e3a20 100644 --- a/src/stream/src/executor/source/kafka_backfill_executor.rs +++ b/src/stream/src/executor/source/kafka_backfill_executor.rs @@ -13,9 +13,9 @@ // limitations under the License. use std::assert_matches::assert_matches; +use std::cell::RefCell; use std::cmp::Ordering; use std::fmt::Formatter; -use std::pin::pin; use anyhow::anyhow; use either::Either; @@ -28,7 +28,7 @@ use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::types::JsonbVal; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ - BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitMetaData, + BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitMetaData, }; use risingwave_connector::ConnectorParams; use risingwave_storage::StateStore; @@ -39,7 +39,6 @@ use super::kafka_backfill_state_table::BackfillStateTableHandler; use crate::executor::monitor::StreamingMetrics; use crate::executor::*; -pub type SplitId = String; #[derive(Clone, Debug, Deserialize, Serialize)] pub enum BackfillState { /// `None` means not started yet. It's the initial state. @@ -70,7 +69,7 @@ impl BackfillState { &mut self, split: &str, offset: &str, - abort_handles: &HashMap, + abort_handles: &HashMap, ) -> bool { let mut vis = false; match self { @@ -87,7 +86,7 @@ impl BackfillState { Ordering::Greater => { // backfilling for this split produced more data. *self = BackfillState::SourceCachingUp(offset.to_string()); - abort_handles.get(&split.to_string()).unwrap().abort(); + abort_handles.get(split).unwrap().abort(); } } } @@ -129,7 +128,7 @@ pub struct KafkaBackfillExecutorInner { info: ExecutorInfo, /// Streaming source for external - // FIXME: some fields e.g. its state table is not used. We might need to refactor + // FIXME: some fields e.g. its state table is not used. We might need to refactor. Even latest_split_info is not used. stream_source_core: StreamSourceCore, backfill_state_store: BackfillStateTableHandler, @@ -148,22 +147,37 @@ pub struct KafkaBackfillExecutorInner { connector_params: ConnectorParams, } +/// Local variables used in the backfill stage. +struct BackfillStage { + // stream: Option>, + abort_handles: HashMap, + states: BackfillStates, + /// Note: the offsets are not updated. Should use `state`'s offset to update before using it. + unfinished_splits: Vec, +} + mod stream { use either::Either; - use futures::stream::select_with_strategy; + use futures::stream::{select_with_strategy, BoxStream, PollNext, SelectWithStrategy}; use futures::{Stream, StreamExt}; use risingwave_common::array::StreamChunk; use risingwave_connector::source::BoxChunkSourceStream; use super::{BoxedMessageStream, MessageStreamItem}; - pub type EitherStream<'a> = - impl Stream>> + 'a; + type EitherItem = Either>; - pub fn build_combined_stream( - upstream: &mut BoxedMessageStream, + pub type EitherStream<'a> = SelectWithStrategy< + impl Stream, + impl Stream, + impl FnMut(&mut ()) -> PollNext, + (), + >; + + pub fn build_combined_stream<'a>( + upstream: &'a mut BoxedMessageStream, backfill: BoxChunkSourceStream, - ) -> EitherStream<'_> { + ) -> EitherStream<'a> { select_with_strategy( upstream.by_ref().map(Either::Left), backfill.map(Either::Right), @@ -203,8 +217,9 @@ impl KafkaBackfillExecutorInner { async fn build_stream_source_reader( &self, source_desc: &SourceDesc, - state: ConnectorState, + splits: Vec, ) -> StreamExecutorResult<(BoxChunkSourceStream, HashMap)> { + assert!(!splits.is_empty(), "splits should not be empty"); let column_ids = source_desc .columns .iter() @@ -223,26 +238,21 @@ impl KafkaBackfillExecutorInner { ); let source_ctx = Arc::new(source_ctx); - match state { - Some(splits) => { - let mut abort_handles = HashMap::new(); - let mut streams = vec![]; - for split in splits { - let split_id = split.id().to_string(); - let reader = source_desc - .source - .to_stream(Some(vec![split]), column_ids.clone(), source_ctx.clone()) - .await - .map_err(StreamExecutorError::connector_error)?; - let (abort_handle, abort_registration) = AbortHandle::new_pair(); - let stream = Abortable::new(reader, abort_registration); - abort_handles.insert(split_id, abort_handle); - streams.push(stream); - } - return Ok((futures::stream::select_all(streams).boxed(), abort_handles)); - } - None => return Ok((futures::stream::pending().boxed(), HashMap::new())), + let mut abort_handles = HashMap::new(); + let mut streams = vec![]; + for split in splits { + let split_id = split.id(); + let reader = source_desc + .source + .to_stream(Some(vec![split]), column_ids.clone(), source_ctx.clone()) + .await + .map_err(StreamExecutorError::connector_error)?; + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + let stream = Abortable::new(reader, abort_registration); + abort_handles.insert(split_id, abort_handle); + streams.push(stream); } + Ok((futures::stream::select_all(streams).boxed(), abort_handles)) } #[try_stream(ok = Message, error = StreamExecutorError)] @@ -281,36 +291,24 @@ impl KafkaBackfillExecutorInner { self.backfill_state_store.init_epoch(barrier.epoch); let mut backfill_states: BackfillStates = HashMap::new(); - - let mut unfinished_splits = vec![]; + let mut unfinished_splits = Vec::new(); for ele in boot_state { - let split_id = ele.id().to_string(); + let split_id = ele.id(); let (split, backfill_state) = self .backfill_state_store .try_recover_from_state_store(ele) .await?; backfill_states.insert(split_id, backfill_state); - if split.is_some() { - unfinished_splits.push(split.unwrap()); + if let Some(split) = split { + unfinished_splits.push(split); } } tracing::debug!(?backfill_states, "source backfill started"); - // init in-memory split states with persisted state if any - core.init_split_state(unfinished_splits.clone()); - // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = core; - let recover_state: ConnectorState = - (!unfinished_splits.is_empty()).then_some(unfinished_splits); - let (source_chunk_reader, abort_handles) = self - .build_stream_source_reader(&source_desc, recover_state) - .instrument_await("source_build_reader") - .await?; - // let source_chunk_reader = pin!(source_chunk_reader); - // If the first barrier requires us to pause on startup, pause the stream. if barrier.is_pause_on_startup() { // TODO: support pause on startup @@ -318,14 +316,26 @@ impl KafkaBackfillExecutorInner { yield Message::Barrier(barrier); - // XXX: - // - What's the best poll strategy? - // - Should we also add a barrier stream for backfill executor? - let mut backfill_stream = build_combined_stream(&mut input, source_chunk_reader); - + let mut input = RefCell::new(input); if !backfill_finished(&backfill_states) { - #[for_await] - 'backfill_loop: for either in &mut backfill_stream { + let (source_chunk_reader, abort_handles) = self + .build_stream_source_reader(&source_desc, unfinished_splits.clone()) + .instrument_await("source_build_reader") + .await?; + + // XXX: + // - What's the best poll strategy? + // - Should we also add a barrier stream for backfill executor? + let mut backfill_stream = + Some(build_combined_stream(input.get_mut(), source_chunk_reader)); + let mut backfill_stage = BackfillStage { + abort_handles, + states: backfill_states, + unfinished_splits, + }; + + 'backfill_loop: while let Some(either) = backfill_stream.as_mut().unwrap().next().await + { match either { // Upstream Either::Left(msg) => { @@ -334,9 +344,6 @@ impl KafkaBackfillExecutorInner { }; match msg { Message::Barrier(barrier) => { - let mut target_state = None; - let mut should_trim_state = false; - if let Some(ref mutation) = barrier.mutation.as_deref() { match mutation { Mutation::Pause => { // TODO: @@ -350,23 +357,29 @@ impl KafkaBackfillExecutorInner { "source change split received" ); - self - .apply_split_change( - &source_desc, - &mut backfill_stream, - actor_splits, - ) - .await?; - should_trim_state = true; + _ = backfill_stream.take(); + self.apply_split_change( + &source_desc, + actor_splits, + &mut backfill_stage, + &mut backfill_stream, + input.get_mut(), + true, + ) + .await?; } Mutation::Update(UpdateMutation { actor_splits, .. }) => { - self + _ = backfill_stream.take(); + let _ = self .apply_split_change( &source_desc, - &mut backfill_stream, actor_splits, + &mut backfill_stage, + &mut backfill_stream, + input.get_mut(), + false, ) .await?; } @@ -375,7 +388,7 @@ impl KafkaBackfillExecutorInner { } self.backfill_state_store - .set_states(backfill_states.clone()) + .set_states(backfill_stage.states.clone()) .await?; self.backfill_state_store .state_store @@ -384,10 +397,10 @@ impl KafkaBackfillExecutorInner { yield Message::Barrier(barrier); - if backfill_finished(&backfill_states) { + if backfill_finished(&backfill_stage.states) { // all splits finished backfilling self.backfill_state_store - .set_states(backfill_states.clone()) + .set_states(backfill_stage.states.clone()) .await?; break 'backfill_loop; } @@ -401,11 +414,12 @@ impl KafkaBackfillExecutorInner { tracing::debug!(row = %row.display()); let split = row.datum_at(split_idx).unwrap().into_utf8(); let offset = row.datum_at(offset_idx).unwrap().into_utf8(); - let backfill_state = backfill_states.get_mut(split).unwrap(); + let backfill_state = + backfill_stage.states.get_mut(split).unwrap(); let vis = backfill_state.handle_upstream_row( split, offset, - &abort_handles, + &backfill_stage.abort_handles, ); new_vis.set(i, vis); } @@ -429,32 +443,19 @@ impl KafkaBackfillExecutorInner { let split_offset_mapping = get_split_offset_mapping_from_chunk(&chunk, split_idx, offset_idx) .unwrap(); - let _state: HashMap<_, _> = split_offset_mapping - .iter() - .flat_map(|(split_id, offset)| { - let origin_split_impl = self - .stream_source_core - .stream_source_splits - .get_mut(split_id); - - // update backfill progress - let prev_state = backfill_states.insert( - split_id.to_string(), - BackfillState::Backfilling(Some(offset.to_string())), - ); - // abort_handles should prevents other cases happening - assert_matches!( - prev_state, - Some(BackfillState::Backfilling(_)), - "Unexpected backfilling state, split_id: {split_id}" - ); - - origin_split_impl.map(|split_impl| { - split_impl.update_in_place(offset.clone())?; - Ok::<_, anyhow::Error>((split_id.clone(), split_impl.clone())) - }) - }) - .try_collect()?; + split_offset_mapping.iter().for_each(|(split_id, offset)| { + // update backfill progress + let prev_state = backfill_stage.states.insert( + split_id.clone(), + BackfillState::Backfilling(Some(offset.to_string())), + ); + // abort_handles should prevents other cases happening + assert_matches!( + prev_state, + Some(BackfillState::Backfilling(_)), + "Unexpected backfilling state, split_id: {split_id}" + ); + }); yield Message::Chunk(chunk); } @@ -462,13 +463,14 @@ impl KafkaBackfillExecutorInner { } } + let input = input.into_inner(); // All splits finished backfilling. Now we only forward the source data. #[for_await] for msg in input { let msg = msg?; match msg { Message::Barrier(barrier) => { - // + // TODO: split change // We might need to persist its state. Is is possible that we need to backfill? yield Message::Barrier(barrier); @@ -484,117 +486,134 @@ impl KafkaBackfillExecutorInner { } /// For newly added splits, we do not need to backfill and can directly forward from upstream. - async fn apply_split_change<'upstream>( + async fn apply_split_change<'input>( &mut self, source_desc: &SourceDesc, split_assignment: &HashMap>, - upstream: &'upstream mut BoxedMessageStream, - stream: &mut EitherStream<'upstream>, - abort_handles: &mut HashMap, - ) -> StreamExecutorResult>> { + stage: &mut BackfillStage, + stream: &mut Option>, + input: &'input mut BoxedMessageStream, + should_trim_state: bool, + ) -> StreamExecutorResult<()> { if let Some(target_splits) = split_assignment.get(&self.actor_ctx.id).cloned() { - if let Some(target_state) = self.update_state_if_changed(Some(target_splits)).await? { - tracing::info!( - actor_id = self.actor_ctx.id, - state = ?target_state, - "apply split change" - ); - - self.replace_stream_reader_with_target_state( - source_desc, - target_state.clone(), - upstream, - stream, - abort_handles, - ) - .await?; - - return Ok(Some(target_state)); + if self + .update_state_if_changed(target_splits, stage, should_trim_state) + .await? + { + self.rebuild_stream_reader(source_desc, stage, stream, input) + .await?; } } - Ok(None) + Ok(()) } - /// Note: `update_state_if_changed` will modify `state_cache` + /// Returns `true` if split changed. Otherwise `false`. async fn update_state_if_changed( &mut self, - target_splits: ConnectorState, - ) -> StreamExecutorResult { - let core = &mut self.stream_source_core; - + target_splits: Vec, + stage: &mut BackfillStage, + should_trim_state: bool, + ) -> StreamExecutorResult { let target_splits: HashMap<_, _> = target_splits - .unwrap() .into_iter() .map(|split| (split.id(), split)) .collect(); - let mut target_state: Vec = Vec::with_capacity(target_splits.len()); + let mut target_state: HashMap = + HashMap::with_capacity(target_splits.len()); let mut split_changed = false; - // Note: SourceExecutor uses core.state_cache, but it's a little hard to understand. - - // Checks added splits. - for (split_id, split) in &target_splits { - if let Some(s) = core.state_cache.get(split_id) { - // existing split, no change, clone from cache - target_state.push(s.clone()) + // Checks added splits + for (split_id, split) in target_splits { + if let Some(s) = stage.states.get(&split_id) { + target_state.insert(split_id, s.clone()); } else { split_changed = true; - // write new assigned split to state cache. snapshot is base on cache. - let initial_state = if let Some(recover_state) = core - .split_state_store + let (split, backfill_state) = self + .backfill_state_store .try_recover_from_state_store(split) - .await? - { - recover_state - } else { - split.clone() - }; - - core.state_cache - .entry(split.id()) - .or_insert_with(|| initial_state.clone()); - - target_state.push(initial_state); + .await?; + + target_state.insert(split_id, backfill_state); + if let Some(split) = split { + stage.unfinished_splits.push(split); + } } } - // Checks dropped splits. - // state cache may be stale - for existing_split_id in core.stream_source_splits.keys() { - if !target_splits.contains_key(existing_split_id) { + // Checks dropped splits + for existing_split_id in stage.states.keys() { + if !target_state.contains_key(existing_split_id) { tracing::info!("split dropping detected: {}", existing_split_id); split_changed = true; } } - Ok(split_changed.then_some(target_state)) + if split_changed { + tracing::info!( + actor_id = self.actor_ctx.id, + state = ?target_state, + "apply split change" + ); + + stage + .unfinished_splits + .retain(|split| target_state.get(split.id().as_ref()).is_some()); + + let dropped_splits = stage + .states + .extract_if(|split_id, _| target_state.get(split_id).is_none()) + .map(|(split_id, _)| split_id); + + if should_trim_state { + // trim dropped splits' state + self.backfill_state_store.trim_state(dropped_splits).await?; + } + + stage.states = target_state; + } + + Ok(split_changed) } - async fn replace_stream_reader_with_target_state<'upstream>( + async fn rebuild_stream_reader<'input>( &mut self, source_desc: &SourceDesc, - target_state: Vec, - upstream: &'upstream mut BoxedMessageStream, - stream: &mut EitherStream<'upstream>, - abort_handles: &mut HashMap, + stage: &mut BackfillStage, + stream: &mut Option>, + input: &'input mut BoxedMessageStream, ) -> StreamExecutorResult<()> { + let mut unfinished_splits = Vec::new(); + for split in &mut stage.unfinished_splits { + let state = stage.states.get(split.id().as_ref()).unwrap(); + match state { + BackfillState::Backfilling(Some(offset)) => { + split.update_in_place(offset.clone())?; + unfinished_splits.push(split.clone()); + } + BackfillState::Backfilling(None) + | BackfillState::SourceCachingUp(_) + | BackfillState::Finished => {} + } + } + stage.unfinished_splits = unfinished_splits; + tracing::info!( "actor {:?} apply source split change to {:?}", self.actor_ctx.id, - target_state + stage.unfinished_splits ); // Replace the source reader with a new one of the new state. let (reader, new_abort_handles) = self - .build_stream_source_reader(source_desc, Some(target_state.clone())) + .build_stream_source_reader(source_desc, stage.unfinished_splits.clone()) .await?; - *abort_handles = new_abort_handles; + stage.abort_handles = new_abort_handles; - *stream = build_combined_stream(upstream, reader); + *stream = Some(build_combined_stream(input, reader)); Ok(()) } diff --git a/src/stream/src/executor/source/kafka_backfill_state_table.rs b/src/stream/src/executor/source/kafka_backfill_state_table.rs index 5730d437c61b6..3e2f184bfc0eb 100644 --- a/src/stream/src/executor/source/kafka_backfill_state_table.rs +++ b/src/stream/src/executor/source/kafka_backfill_state_table.rs @@ -16,11 +16,11 @@ use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{JsonbVal, ScalarImpl, ScalarRef, ScalarRefImpl}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::{bail, row}; -use risingwave_connector::source::{SplitImpl, SplitMetaData}; +use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData}; use risingwave_pb::catalog::PbTable; use risingwave_storage::StateStore; -use super::kafka_backfill_executor::{BackfillState, BackfillStates, SplitId}; +use super::kafka_backfill_executor::{BackfillState, BackfillStates}; use crate::common::table::state_table::StateTable; use crate::executor::error::StreamExecutorError; use crate::executor::StreamExecutorResult; @@ -46,14 +46,14 @@ impl BackfillStateTableHandler { pub(crate) async fn get(&self, key: &SplitId) -> StreamExecutorResult> { self.state_store - .get_row(row::once(Some(Self::string_to_scalar(key)))) + .get_row(row::once(Some(Self::string_to_scalar(key.as_ref())))) .await .map_err(StreamExecutorError::from) } pub async fn set(&mut self, key: SplitId, value: JsonbVal) -> StreamExecutorResult<()> { let row = [ - Some(Self::string_to_scalar(&key)), + Some(Self::string_to_scalar(key.as_ref())), Some(ScalarImpl::Jsonb(value)), ]; match self.get(&key).await? { @@ -76,41 +76,47 @@ impl BackfillStateTableHandler { } pub async fn set_states(&mut self, states: BackfillStates) -> StreamExecutorResult<()> { - if states.is_empty() { - bail!("states require not null"); - } else { - for (split_id, state) in states { - self.set(split_id, state.encode_to_json()).await?; - } + for (split_id, state) in states { + self.set(split_id, state.encode_to_json()).await?; } Ok(()) } + pub async fn trim_state( + &mut self, + to_trim: impl IntoIterator, + ) -> StreamExecutorResult<()> { + for split_id in to_trim { + tracing::info!("trimming source state for split {}", split_id); + self.delete(&split_id).await?; + } + + Ok(()) + } + /// `None` means no need to read from the split anymore (backfill finished) pub async fn try_recover_from_state_store( &mut self, mut stream_source_split: SplitImpl, ) -> StreamExecutorResult<(Option, BackfillState)> { - Ok( - match self.get(&stream_source_split.id().to_string()).await? { - None => (Some(stream_source_split), BackfillState::Backfilling(None)), - Some(row) => match row.datum_at(1) { - Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { - let state = BackfillState::restore_from_json(jsonb_ref.to_owned_scalar())?; - let new_split = match &state { - BackfillState::Backfilling(None) => Some(stream_source_split), - BackfillState::Backfilling(Some(offset)) => { - stream_source_split.update_in_place(offset.clone())?; - Some(stream_source_split) - } - BackfillState::SourceCachingUp(_) => None, - BackfillState::Finished => None, - }; - (new_split, state) - } - _ => unreachable!(), - }, + Ok(match self.get(&stream_source_split.id()).await? { + None => (Some(stream_source_split), BackfillState::Backfilling(None)), + Some(row) => match row.datum_at(1) { + Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { + let state = BackfillState::restore_from_json(jsonb_ref.to_owned_scalar())?; + let new_split = match &state { + BackfillState::Backfilling(None) => Some(stream_source_split), + BackfillState::Backfilling(Some(offset)) => { + stream_source_split.update_in_place(offset.clone())?; + Some(stream_source_split) + } + BackfillState::SourceCachingUp(_) => None, + BackfillState::Finished => None, + }; + (new_split, state) + } + _ => unreachable!(), }, - ) + }) } } diff --git a/src/stream/src/from_proto/source_backfill.rs b/src/stream/src/from_proto/source_backfill.rs index 56745c255d665..d81599913553d 100644 --- a/src/stream/src/from_proto/source_backfill.rs +++ b/src/stream/src/from_proto/source_backfill.rs @@ -23,9 +23,7 @@ use risingwave_pb::plan_common::{ use risingwave_pb::stream_plan::SourceBackfillNode; use super::*; -use crate::executor::kafka_backfill_executor::{ - KafkaBackfillExecutor, KafkaBackfillExecutorWrapper, -}; +use crate::executor::kafka_backfill_executor::{KafkaBackfillExecutor, KafkaBackfillExecutorInner}; use crate::executor::source::StreamSourceCore; use crate::executor::state_table_handler::SourceStateTableHandler; use crate::executor::BackfillStateTableHandler;