From dcc1562cd9382b227a38a2b866f4ed25be040bfa Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Wed, 6 Sep 2023 13:37:10 +0800 Subject: [PATCH] Remove split migration functionality and associated code --- .../src/executor/source/source_executor.rs | 43 ------------------- 1 file changed, 43 deletions(-) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 55eafcc5b953a..c833b3179182d 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -121,40 +121,6 @@ impl SourceExecutor { .map_err(StreamExecutorError::connector_error) } - fn check_split_assignment_is_migration( - &self, - actor_splits: &HashMap>, - ) -> bool { - let core = self.stream_source_core.as_ref().unwrap(); - - let mut split_to_actors_index = HashMap::new(); - - for (actor_id, splits) in actor_splits { - for split in splits { - split_to_actors_index - .entry(split.id()) - .or_insert(vec![]) - .push(*actor_id); - } - } - - for split_id in core.state_cache.keys() { - if let Some(actor_ids) = split_to_actors_index.remove(split_id) { - if !actor_ids.contains(&self.actor_ctx.id) { - tracing::warn!( - "split {} migration from {} detected, target might be {:?}", - split_id, - self.actor_ctx.id, - actor_ids - ); - return true; - } - } - } - - false - } - #[inline] fn get_metric_labels(&self) -> [String; 3] { [ @@ -520,15 +486,6 @@ impl SourceExecutor { "source change split received" ); - // In the context of split changes, we do not allow - // split - // migration because it can lead to inconsistent states. - // Therefore, all split migration must be done via - // update - // mutation and pause/resume - assert!(!self - .check_split_assignment_is_migration(actor_splits)); - target_state = self .apply_split_change( &source_desc,