Skip to content

Commit

Permalink
fix: dup split assignment in source (#18541) (#18544)
Browse files Browse the repository at this point in the history
Co-authored-by: Bohan Zhang <[email protected]>
Co-authored-by: tabversion <[email protected]>
  • Loading branch information
3 people authored Sep 16, 2024
1 parent 7aff785 commit 7acc37c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 19 deletions.
17 changes: 8 additions & 9 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,14 +522,8 @@ impl CommandContext {
}

Command::SourceSplitAssignment(change) => {
let mut checked_assignment = change.clone();
checked_assignment
.iter_mut()
.for_each(|(_, assignment)| validate_assignment(assignment));

let mut diff = HashMap::new();

for actor_splits in checked_assignment.values() {
for actor_splits in change.values() {
diff.extend(actor_splits.clone());
}

Expand Down Expand Up @@ -581,7 +575,11 @@ impl CommandContext {
let mut checked_split_assignment = split_assignment.clone();
checked_split_assignment
.iter_mut()
.for_each(|(_, assignment)| validate_assignment(assignment));
.for_each(|(_, assignment)| {
// No related actor running before, we don't need to check the mutation
// should be wrapped with pause.
validate_assignment(assignment);
});
let actor_splits = checked_split_assignment
.values()
.flat_map(build_actor_connector_splits)
Expand Down Expand Up @@ -789,7 +787,8 @@ impl CommandContext {

for reschedule in reschedules.values() {
let mut checked_assignment = reschedule.actor_splits.clone();
validate_assignment(&mut checked_assignment);
// Update mutation always wrapped by Pause and Resume mutation. no further action needed.
_ = validate_assignment(&mut checked_assignment);

for (actor_id, splits) in &checked_assignment {
actor_splits.insert(
Expand Down
40 changes: 30 additions & 10 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,21 +588,24 @@ where
)
}

pub fn validate_assignment(assignment: &mut HashMap<ActorId, Vec<SplitImpl>>) {
pub fn validate_assignment(assignment: &mut HashMap<ActorId, Vec<SplitImpl>>) -> bool {
let mut dup_assignment_found_flag = false;

// check if one split is assign to multiple actors
let mut split_to_actor = HashMap::new();
for (actor_id, splits) in &mut *assignment {
let _ = splits.iter().map(|split| {
for split in splits {
split_to_actor
.entry(split.id())
.or_insert_with(Vec::new)
.push(*actor_id)
});
.push(*actor_id);
}
}

for (split_id, actor_ids) in &mut split_to_actor {
if actor_ids.len() > 1 {
tracing::warn!(split_id = ?split_id, actor_ids = ?actor_ids, "split is assigned to multiple actors");
dup_assignment_found_flag = true;
}
// keep the first actor and remove the rest from the assignment
for actor_id in actor_ids.iter().skip(1) {
Expand All @@ -612,6 +615,8 @@ pub fn validate_assignment(assignment: &mut HashMap<ActorId, Vec<SplitImpl>>) {
.retain(|split| split.id() != *split_id);
}
}

dup_assignment_found_flag
}

fn align_backfill_splits(
Expand Down Expand Up @@ -1100,15 +1105,28 @@ impl SourceManager {
/// The command will first updates `SourceExecutor`'s splits, and finally calls `Self::apply_source_change`
/// to update states in `SourceManager`.
async fn tick(&self) -> MetaResult<()> {
let split_assignment = {
let mut split_assignment = {
let core_guard = self.core.lock().await;
core_guard.reassign_splits().await?
};

let dup_assignment_flag = split_assignment
.iter_mut()
.map(|(_, assignment)| validate_assignment(assignment))
.reduce(|a, b| a || b)
.unwrap_or(false);

if !split_assignment.is_empty() {
let command = Command::SourceSplitAssignment(split_assignment);
tracing::info!(command = ?command, "pushing down split assignment command");
self.barrier_scheduler.run_command(command).await?;
if dup_assignment_flag {
tracing::warn!("duplicate split assignment found, wrap with pause and resume");
self.barrier_scheduler
.run_config_change_command_with_pause(command)
.await?;
} else {
self.barrier_scheduler.run_command(command).await?;
}
}

Ok(())
Expand Down Expand Up @@ -1317,10 +1335,12 @@ mod tests {
1 => test_assignment,
};

fragment_assignment.iter_mut().for_each(|(_, assignment)| {
validate_assignment(assignment);
});

let dup_assignment_flag = fragment_assignment
.iter_mut()
.map(|(_, assignment)| validate_assignment(assignment))
.reduce(|a, b| a || b)
.unwrap_or(false);
assert!(dup_assignment_flag);
{
let mut split_to_actor = HashMap::new();
for actor_to_splits in fragment_assignment.values() {
Expand Down

0 comments on commit 7acc37c

Please sign in to comment.