diff --git a/src/stream/src/executor/source/executor_core.rs b/src/stream/src/executor/source/executor_core.rs index 6b3713cc64af1..8857654be3cf2 100644 --- a/src/stream/src/executor/source/executor_core.rs +++ b/src/stream/src/executor/source/executor_core.rs @@ -35,17 +35,16 @@ pub struct StreamSourceCore { /// Split info for stream source. A source executor might read data from several splits of /// external connector. - pub(crate) latest_split_info: HashMap, + pub(crate) stream_source_splits: HashMap, /// Stores information of the splits. pub(crate) split_state_store: SourceStateTableHandler, - /// Contains the latests offsets for the splits that are updated *in the current epoch*. - /// It is cleared after each barrier. + /// In-memory cache for the splits. /// /// Source messages will only write the cache. /// It is read on split change and rebuild stream reader on error. - pub(crate) updated_splits_in_epoch: HashMap, + pub(crate) state_cache: HashMap, } impl StreamSourceCore @@ -64,14 +63,14 @@ where source_name, column_ids, source_desc_builder: Some(source_desc_builder), - latest_split_info: HashMap::new(), + stream_source_splits: HashMap::new(), split_state_store, - updated_splits_in_epoch: HashMap::new(), + state_cache: HashMap::new(), } } pub fn init_split_state(&mut self, splits: Vec) { - self.latest_split_info = splits + self.stream_source_splits = splits .into_iter() .map(|split| (split.id(), split)) .collect(); diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index a2478cdb6bb0d..cbda448712e72 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -149,7 +149,7 @@ impl FsSourceExecutor { let mut target_state: Vec = Vec::new(); let mut no_change_flag = true; for sc in rhs { - if let Some(s) = core.updated_splits_in_epoch.get(&sc.id()) { + if let Some(s) = core.state_cache.get(&sc.id()) { let fs = s .as_fs() .unwrap_or_else(|| panic!("split {:?} is not fs", s)); @@ -173,7 +173,7 @@ impl FsSourceExecutor { sc }; - core.updated_splits_in_epoch + core.state_cache .entry(state.id()) .or_insert_with(|| state.clone()); target_state.push(state); @@ -201,7 +201,7 @@ impl FsSourceExecutor { .map_err(StreamExecutorError::connector_error); stream.replace_data_stream(reader); - self.stream_source_core.latest_split_info = target_state + self.stream_source_core.stream_source_splits = target_state .into_iter() .map(|split| (split.id(), split)) .collect(); @@ -215,7 +215,7 @@ impl FsSourceExecutor { ) -> StreamExecutorResult<()> { let core = &mut self.stream_source_core; let incompleted = core - .updated_splits_in_epoch + .state_cache .values() .filter(|split| { let fs = split @@ -227,7 +227,7 @@ impl FsSourceExecutor { .collect_vec(); let completed = core - .updated_splits_in_epoch + .state_cache .values() .filter(|split| { let fs = split @@ -250,7 +250,7 @@ impl FsSourceExecutor { // commit anyway, even if no message saved core.split_state_store.state_store.commit(epoch).await?; - core.updated_splits_in_epoch.clear(); + core.state_cache.clear(); Ok(()) } @@ -439,18 +439,17 @@ impl FsSourceExecutor { let state: Vec<(SplitId, SplitImpl)> = mapping .iter() .flat_map(|(id, offset)| { - self.stream_source_core.latest_split_info.get_mut(id).map( - |origin_split| { - origin_split.update_in_place(offset.clone())?; - Ok::<_, anyhow::Error>((id.clone(), origin_split.clone())) - }, - ) + let origin_split = + self.stream_source_core.stream_source_splits.get_mut(id); + + origin_split.map(|split| { + split.update_in_place(offset.clone())?; + Ok::<_, anyhow::Error>((id.clone(), split.clone())) + }) }) .try_collect()?; - self.stream_source_core - .updated_splits_in_epoch - .extend(state); + self.stream_source_core.state_cache.extend(state); } self.metrics diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 8ad653c5f8397..07903a2c7b34e 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -138,7 +138,6 @@ impl SourceExecutor { ] } - /// Returns `target_states` if split changed. Otherwise `None`. async fn apply_split_change( &mut self, source_desc: &SourceDesc, @@ -177,9 +176,7 @@ impl SourceExecutor { Ok(None) } - /// Returns `target_states` if split changed. Otherwise `None`. - /// - /// Note: `update_state_if_changed` will modify `updated_splits_in_epoch` + /// Note: `update_state_if_changed` will modify `state_cache` async fn update_state_if_changed( &mut self, state: ConnectorState, @@ -196,9 +193,8 @@ impl SourceExecutor { let mut split_changed = false; - // Checks added splits for (split_id, split) in &target_splits { - if let Some(s) = core.updated_splits_in_epoch.get(split_id) { + if let Some(s) = core.state_cache.get(split_id) { // existing split, no change, clone from cache target_state.push(s.clone()) } else { @@ -215,7 +211,7 @@ impl SourceExecutor { split.clone() }; - core.updated_splits_in_epoch + core.state_cache .entry(split.id()) .or_insert_with(|| initial_state.clone()); @@ -223,8 +219,8 @@ impl SourceExecutor { } } - // Checks dropped splits - for existing_split_id in core.latest_split_info.keys() { + // state cache may be stale + for existing_split_id in core.stream_source_splits.keys() { if !target_splits.contains_key(existing_split_id) { tracing::info!("split dropping detected: {}", existing_split_id); split_changed = true; @@ -239,6 +235,7 @@ impl SourceExecutor { &mut self, source_desc: &SourceDesc, stream: &mut StreamReaderWithPause, + split_info: &mut [SplitImpl], e: StreamExecutorError, ) -> StreamExecutorResult<()> { let core = self.stream_source_core.as_mut().unwrap(); @@ -255,8 +252,26 @@ impl SourceExecutor { self.actor_ctx.id.to_string(), core.source_id.to_string(), ]); + // fetch the newest offset, either it's in cache (before barrier) + // or in state table (just after barrier) + let target_state = if core.state_cache.is_empty() { + for ele in &mut *split_info { + if let Some(recover_state) = core + .split_state_store + .try_recover_from_state_store(ele) + .await? + { + *ele = recover_state; + } + } + split_info.to_owned() + } else { + core.state_cache + .values() + .map(|split_impl| split_impl.to_owned()) + .collect_vec() + }; - let target_state = core.latest_split_info.values().cloned().collect(); self.replace_stream_reader_with_target_state(source_desc, stream, target_state) .await } @@ -286,24 +301,16 @@ impl SourceExecutor { /// - `target_state`: the new split info from barrier. `None` if no split update. /// - `should_trim_state`: whether to trim state for dropped splits. - /// - /// For scaling, the connector splits can be migrated to other actors, but - /// won't be added or removed. Actors should not trim states for splits that - /// are moved to other actors. - /// - /// For source split change, split will not be migrated and we can trim states - /// for deleted splits. async fn persist_state_and_clear_cache( &mut self, epoch: EpochPair, - // target_state is Some means split change (or migration) happened. target_state: Option>, should_trim_state: bool, ) -> StreamExecutorResult<()> { let core = self.stream_source_core.as_mut().unwrap(); let mut cache = core - .updated_splits_in_epoch + .state_cache .values() .map(|split_impl| split_impl.to_owned()) .collect_vec(); @@ -315,7 +322,7 @@ impl SourceExecutor { cache.retain(|split| target_split_ids.contains(&split.id())); let dropped_splits = core - .latest_split_info + .stream_source_splits .extract_if(|split_id, _| !target_split_ids.contains(split_id)) .map(|(_, split)| split) .collect_vec(); @@ -325,7 +332,7 @@ impl SourceExecutor { core.split_state_store.trim_state(&dropped_splits).await?; } - core.latest_split_info = target_splits + core.stream_source_splits = target_splits .into_iter() .map(|split| (split.id(), split)) .collect(); @@ -338,7 +345,7 @@ impl SourceExecutor { // commit anyway, even if no message saved core.split_state_store.state_store.commit(epoch).await?; - core.updated_splits_in_epoch.clear(); + core.state_cache.clear(); Ok(()) } @@ -403,6 +410,7 @@ impl SourceExecutor { _ => {} } } + let mut latest_split_info = boot_state.clone(); core.split_state_store.init_epoch(barrier.epoch); @@ -454,8 +462,13 @@ impl SourceExecutor { while let Some(msg) = stream.next().await { let Ok(msg) = msg else { tokio::time::sleep(Duration::from_millis(1000)).await; - self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err()) - .await?; + self.rebuild_stream_reader_from_error( + &source_desc, + &mut stream, + &mut latest_split_info, + msg.unwrap_err(), + ) + .await?; continue; }; @@ -500,6 +513,10 @@ impl SourceExecutor { } } + if let Some(target_state) = &target_state { + latest_split_info = target_state.clone(); + } + self.persist_state_and_clear_cache(epoch, target_state, should_trim_state) .await?; @@ -555,25 +572,24 @@ impl SourceExecutor { let state: HashMap<_, _> = mapping .iter() .flat_map(|(split_id, offset)| { - self.stream_source_core + let origin_split_impl = self + .stream_source_core .as_mut() .unwrap() - .latest_split_info - .get_mut(split_id) - .map(|original_split_impl| { - original_split_impl.update_in_place(offset.clone())?; - Ok::<_, anyhow::Error>(( - split_id.clone(), - original_split_impl.clone(), - )) - }) + .stream_source_splits + .get_mut(split_id); + + origin_split_impl.map(|split_impl| { + split_impl.update_in_place(offset.clone())?; + Ok::<_, anyhow::Error>((split_id.clone(), split_impl.clone())) + }) }) .try_collect()?; self.stream_source_core .as_mut() .unwrap() - .updated_splits_in_epoch + .state_cache .extend(state); } metric_row_per_barrier += chunk.cardinality() as u64; @@ -720,9 +736,9 @@ mod tests { source_id: table_id, column_ids, source_desc_builder: Some(source_desc_builder), - latest_split_info: HashMap::new(), + stream_source_splits: HashMap::new(), split_state_store, - updated_splits_in_epoch: HashMap::new(), + state_cache: HashMap::new(), source_name: MOCK_SOURCE_NAME.to_string(), }; @@ -814,9 +830,9 @@ mod tests { source_id: table_id, column_ids: column_ids.clone(), source_desc_builder: Some(source_desc_builder), - latest_split_info: HashMap::new(), + stream_source_splits: HashMap::new(), split_state_store, - updated_splits_in_epoch: HashMap::new(), + state_cache: HashMap::new(), source_name: MOCK_SOURCE_NAME.to_string(), };