From 25928809ab658ac4d57066a48450ac089943c090 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 19 Feb 2024 18:31:18 +0800 Subject: [PATCH] refactor: refactor source executor (part 2) (#15104) --- .../src/executor/source/source_executor.rs | 172 +++++++++--------- .../executor/source/state_table_handler.rs | 11 +- 2 files changed, 84 insertions(+), 99 deletions(-) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 8ad653c5f839..e2567bb14149 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -24,7 +24,7 @@ use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ - BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitMetaData, + BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitMetaData, }; use risingwave_connector::ConnectorParams; use risingwave_storage::StateStore; @@ -138,13 +138,21 @@ impl SourceExecutor { ] } - /// Returns `target_states` if split changed. Otherwise `None`. + /// - `should_trim_state`: whether to trim state for dropped splits. + /// + /// For scaling, the connector splits can be migrated to other actors, but + /// won't be added or removed. Actors should not trim states for splits that + /// are moved to other actors. + /// + /// For source split change, split will not be migrated and we can trim states + /// for deleted splits. async fn apply_split_change( &mut self, source_desc: &SourceDesc, stream: &mut StreamReaderWithPause, split_assignment: &HashMap>, - ) -> StreamExecutorResult>> { + should_trim_state: bool, + ) -> StreamExecutorResult<()> { self.metrics .source_split_change_count .with_label_values( @@ -156,82 +164,96 @@ impl SourceExecutor { ) .inc(); if let Some(target_splits) = split_assignment.get(&self.actor_ctx.id).cloned() { - if let Some(target_state) = self.update_state_if_changed(Some(target_splits)).await? { - tracing::info!( - actor_id = self.actor_ctx.id, - state = ?target_state, - "apply split change" - ); - - self.replace_stream_reader_with_target_state( - source_desc, - stream, - target_state.clone(), - ) - .await?; - - return Ok(Some(target_state)); + if self + .update_state_if_changed(target_splits, should_trim_state) + .await? + { + self.rebuild_stream_reader(source_desc, stream).await?; } } - Ok(None) + Ok(()) } - /// Returns `target_states` if split changed. Otherwise `None`. - /// - /// Note: `update_state_if_changed` will modify `updated_splits_in_epoch` + /// Returns `true` if split changed. Otherwise `false`. async fn update_state_if_changed( &mut self, - state: ConnectorState, - ) -> StreamExecutorResult { + target_splits: Vec, + should_trim_state: bool, + ) -> StreamExecutorResult { let core = self.stream_source_core.as_mut().unwrap(); - let target_splits: HashMap<_, _> = state - .unwrap() + let target_splits: HashMap<_, _> = target_splits .into_iter() .map(|split| (split.id(), split)) .collect(); - let mut target_state: Vec = Vec::with_capacity(target_splits.len()); + let mut target_state: HashMap = + HashMap::with_capacity(target_splits.len()); let mut split_changed = false; // Checks added splits - for (split_id, split) in &target_splits { - if let Some(s) = core.updated_splits_in_epoch.get(split_id) { - // existing split, no change, clone from cache - target_state.push(s.clone()) + for (split_id, split) in target_splits { + if let Some(s) = core.latest_split_info.get(&split_id) { + // For existing splits, we should use the latest offset from the cache. + // `target_splits` is from meta and contains the initial offset. + target_state.insert(split_id, s.clone()); } else { split_changed = true; // write new assigned split to state cache. snapshot is base on cache. let initial_state = if let Some(recover_state) = core .split_state_store - .try_recover_from_state_store(split) + .try_recover_from_state_store(&split) .await? { recover_state } else { - split.clone() + split }; core.updated_splits_in_epoch - .entry(split.id()) + .entry(split_id.clone()) .or_insert_with(|| initial_state.clone()); - target_state.push(initial_state); + target_state.insert(split_id, initial_state); } } // Checks dropped splits for existing_split_id in core.latest_split_info.keys() { - if !target_splits.contains_key(existing_split_id) { + if !target_state.contains_key(existing_split_id) { tracing::info!("split dropping detected: {}", existing_split_id); split_changed = true; } } - Ok(split_changed.then_some(target_state)) + if split_changed { + tracing::info!( + actor_id = self.actor_ctx.id, + state = ?target_state, + "apply split change" + ); + + core.updated_splits_in_epoch + .retain(|split_id, _| target_state.get(split_id).is_some()); + + let dropped_splits = core + .latest_split_info + .extract_if(|split_id, _| target_state.get(split_id).is_none()) + .map(|(_, split)| split) + .collect_vec(); + + if should_trim_state && !dropped_splits.is_empty() { + // trim dropped splits' state + core.split_state_store.trim_state(&dropped_splits).await?; + } + + core.latest_split_info = target_state; + } + + Ok(split_changed) } /// Rebuild stream if there is a err in stream @@ -256,17 +278,17 @@ impl SourceExecutor { core.source_id.to_string(), ]); - let target_state = core.latest_split_info.values().cloned().collect(); - self.replace_stream_reader_with_target_state(source_desc, stream, target_state) - .await + self.rebuild_stream_reader(source_desc, stream).await } - async fn replace_stream_reader_with_target_state( + async fn rebuild_stream_reader( &mut self, source_desc: &SourceDesc, stream: &mut StreamReaderWithPause, - target_state: Vec, ) -> StreamExecutorResult<()> { + let core = self.stream_source_core.as_mut().unwrap(); + let target_state: Vec = core.latest_split_info.values().cloned().collect(); + tracing::info!( "actor {:?} apply source split change to {:?}", self.actor_ctx.id, @@ -284,56 +306,21 @@ impl SourceExecutor { Ok(()) } - /// - `target_state`: the new split info from barrier. `None` if no split update. - /// - `should_trim_state`: whether to trim state for dropped splits. - /// - /// For scaling, the connector splits can be migrated to other actors, but - /// won't be added or removed. Actors should not trim states for splits that - /// are moved to other actors. - /// - /// For source split change, split will not be migrated and we can trim states - /// for deleted splits. async fn persist_state_and_clear_cache( &mut self, epoch: EpochPair, - // target_state is Some means split change (or migration) happened. - target_state: Option>, - should_trim_state: bool, ) -> StreamExecutorResult<()> { let core = self.stream_source_core.as_mut().unwrap(); - let mut cache = core + let cache = core .updated_splits_in_epoch .values() .map(|split_impl| split_impl.to_owned()) .collect_vec(); - if let Some(target_splits) = target_state { - let target_split_ids: HashSet<_> = - target_splits.iter().map(|split| split.id()).collect(); - - cache.retain(|split| target_split_ids.contains(&split.id())); - - let dropped_splits = core - .latest_split_info - .extract_if(|split_id, _| !target_split_ids.contains(split_id)) - .map(|(_, split)| split) - .collect_vec(); - - if should_trim_state && !dropped_splits.is_empty() { - // trim dropped splits' state - core.split_state_store.trim_state(&dropped_splits).await?; - } - - core.latest_split_info = target_splits - .into_iter() - .map(|split| (split.id(), split)) - .collect(); - } - if !cache.is_empty() { tracing::debug!(actor_id = self.actor_ctx.id, state = ?cache, "take snapshot"); - core.split_state_store.set_states(cache).await? + core.split_state_store.set_states(cache).await?; } // commit anyway, even if no message saved @@ -471,9 +458,6 @@ impl SourceExecutor { let epoch = barrier.epoch; - let mut target_state = None; - let mut should_trim_state = false; - if let Some(mutation) = barrier.mutation.as_deref() { match mutation { Mutation::Pause => stream.pause_stream(), @@ -485,23 +469,29 @@ impl SourceExecutor { "source change split received" ); - target_state = self - .apply_split_change(&source_desc, &mut stream, actor_splits) - .await?; - should_trim_state = true; + self.apply_split_change( + &source_desc, + &mut stream, + actor_splits, + true, + ) + .await?; } Mutation::Update(UpdateMutation { actor_splits, .. }) => { - target_state = self - .apply_split_change(&source_desc, &mut stream, actor_splits) - .await?; + self.apply_split_change( + &source_desc, + &mut stream, + actor_splits, + false, + ) + .await?; } _ => {} } } - self.persist_state_and_clear_cache(epoch, target_state, should_trim_state) - .await?; + self.persist_state_and_clear_cache(epoch).await?; self.metrics .source_row_per_barrier diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index 7bfb2bd3ec48..bcb93655c578 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -178,14 +178,9 @@ impl SourceStateTableHandler { where SS: SplitMetaData, { - if states.is_empty() { - // TODO should be a clear Error Code - bail!("states require not null"); - } else { - for split_impl in states { - self.set(split_impl.id(), split_impl.encode_to_json()) - .await?; - } + for split_impl in states { + self.set(split_impl.id(), split_impl.encode_to_json()) + .await?; } Ok(()) }