Skip to content

Commit

Permalink
Remove split migration functionality and associated code
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Sep 6, 2023
1 parent 99ecd5d commit dcc1562
Showing 1 changed file with 0 additions and 43 deletions.
43 changes: 0 additions & 43 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,40 +121,6 @@ impl<S: StateStore> SourceExecutor<S> {
.map_err(StreamExecutorError::connector_error)
}

fn check_split_assignment_is_migration(
&self,
actor_splits: &HashMap<ActorId, Vec<SplitImpl>>,
) -> bool {
let core = self.stream_source_core.as_ref().unwrap();

let mut split_to_actors_index = HashMap::new();

for (actor_id, splits) in actor_splits {
for split in splits {
split_to_actors_index
.entry(split.id())
.or_insert(vec![])
.push(*actor_id);
}
}

for split_id in core.state_cache.keys() {
if let Some(actor_ids) = split_to_actors_index.remove(split_id) {
if !actor_ids.contains(&self.actor_ctx.id) {
tracing::warn!(
"split {} migration from {} detected, target might be {:?}",
split_id,
self.actor_ctx.id,
actor_ids
);
return true;
}
}
}

false
}

#[inline]
fn get_metric_labels(&self) -> [String; 3] {
[
Expand Down Expand Up @@ -520,15 +486,6 @@ impl<S: StateStore> SourceExecutor<S> {
"source change split received"
);

// In the context of split changes, we do not allow
// split
// migration because it can lead to inconsistent states.
// Therefore, all split migration must be done via
// update
// mutation and pause/resume
assert!(!self
.check_split_assignment_is_migration(actor_splits));

target_state = self
.apply_split_change(
&source_desc,
Expand Down

0 comments on commit dcc1562

Please sign in to comment.