Skip to content

Commit

Permalink
fix resolving of actor changes for sink into table
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Jan 11, 2024
1 parent 959afdf commit 15fc876
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 31 deletions.
76 changes: 46 additions & 30 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,31 @@ pub struct ReplaceTablePlan {
pub init_split_assignment: SplitAssignment,
}

impl ReplaceTablePlan {
fn actor_changes(&self) -> CommandActorChanges {
let worker_actors = self.new_table_fragments.worker_actor_ids();
let barrier_inject_actors: &HashSet<_> = &self
.new_table_fragments
.barrier_inject_actor_ids()
.into_iter()
.collect();
let to_add = worker_actors
.into_iter()
.flat_map(|(node_id, actors)| {
actors.into_iter().map(move |actor_id| ActorDesc {
id: actor_id,
node_id,
is_injectable: barrier_inject_actors.contains(&actor_id),
})
})
.collect_vec();
CommandActorChanges::Actor {
to_add,
to_remove: self.old_table_fragments.actor_ids().into_iter().collect(),
}
}
}

/// [`Command`] is the input of [`crate::barrier::GlobalBarrierManager`]. For different commands,
/// it will build different barriers to send, and may do different stuffs after the barrier is
/// collected.
Expand Down Expand Up @@ -188,14 +213,16 @@ impl Command {
to_remove: node_actors.values().flatten().cloned().collect(),
},
Command::CreateStreamingJob {
table_fragments, ..
table_fragments,
replace_table,
..
} => {
let worker_actors = table_fragments.worker_actor_ids();
let barrier_inject_actors: &HashSet<_> = &table_fragments
.barrier_inject_actor_ids()
.into_iter()
.collect();
let to_add = worker_actors
let mut to_add = worker_actors
.into_iter()
.flat_map(|(node_id, actors)| {
actors.into_iter().map(move |actor_id| ActorDesc {
Expand All @@ -206,9 +233,22 @@ impl Command {
})
.collect_vec();

CommandActorChanges::Actor {
to_add,
to_remove: Default::default(),
if let Some(plan) = replace_table {
let changes = plan.actor_changes();
let CommandActorChanges::Actor {
to_add: to_add_plan,
to_remove,
} = changes
else {
unreachable!("replace plan should have actor changes")
};
to_add.extend(to_add_plan);
CommandActorChanges::Actor { to_add, to_remove }
} else {
CommandActorChanges::Actor {
to_add,
to_remove: Default::default(),
}
}
}
Command::CancelStreamingJob(table_fragments) => CommandActorChanges::Actor {
Expand All @@ -233,31 +273,7 @@ impl Command {

CommandActorChanges::Actor { to_add, to_remove }
}
Command::ReplaceTable(ReplaceTablePlan {
old_table_fragments,
new_table_fragments,
..
}) => {
let worker_actors = new_table_fragments.worker_actor_ids();
let barrier_inject_actors: &HashSet<_> = &new_table_fragments
.barrier_inject_actor_ids()
.into_iter()
.collect();
let to_add = worker_actors
.into_iter()
.flat_map(|(node_id, actors)| {
actors.into_iter().map(move |actor_id| ActorDesc {
id: actor_id,
node_id,
is_injectable: barrier_inject_actors.contains(&actor_id),
})
})
.collect_vec();
CommandActorChanges::Actor {
to_add,
to_remove: old_table_fragments.actor_ids().into_iter().collect(),
}
}
Command::ReplaceTable(plan) => plan.actor_changes(),
Command::SourceSplitAssignment(_) => CommandActorChanges::None,
Command::Throttle(_) => CommandActorChanges::None,
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,7 @@ impl FragmentManager {
if let Some(node) = downstream_actor.nodes.as_mut() {
update_merge_node_upstream(
node,
&fragment_id,
fragment_id,
&removed_actor_ids,
&added_actor_ids,
);
Expand Down

0 comments on commit 15fc876

Please sign in to comment.