diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index b292ce926afd2..bb31b2898b702 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -85,6 +85,31 @@ pub struct ReplaceTablePlan { pub init_split_assignment: SplitAssignment, } +impl ReplaceTablePlan { + fn actor_changes(&self) -> CommandActorChanges { + let worker_actors = self.new_table_fragments.worker_actor_ids(); + let barrier_inject_actors: &HashSet<_> = &self + .new_table_fragments + .barrier_inject_actor_ids() + .into_iter() + .collect(); + let to_add = worker_actors + .into_iter() + .flat_map(|(node_id, actors)| { + actors.into_iter().map(move |actor_id| ActorDesc { + id: actor_id, + node_id, + is_injectable: barrier_inject_actors.contains(&actor_id), + }) + }) + .collect_vec(); + CommandActorChanges::Actor { + to_add, + to_remove: self.old_table_fragments.actor_ids().into_iter().collect(), + } + } +} + /// [`Command`] is the input of [`crate::barrier::GlobalBarrierManager`]. For different commands, /// it will build different barriers to send, and may do different stuffs after the barrier is /// collected. @@ -188,14 +213,16 @@ impl Command { to_remove: node_actors.values().flatten().cloned().collect(), }, Command::CreateStreamingJob { - table_fragments, .. + table_fragments, + replace_table, + .. } => { let worker_actors = table_fragments.worker_actor_ids(); let barrier_inject_actors: &HashSet<_> = &table_fragments .barrier_inject_actor_ids() .into_iter() .collect(); - let to_add = worker_actors + let mut to_add = worker_actors .into_iter() .flat_map(|(node_id, actors)| { actors.into_iter().map(move |actor_id| ActorDesc { @@ -206,9 +233,22 @@ impl Command { }) .collect_vec(); - CommandActorChanges::Actor { - to_add, - to_remove: Default::default(), + if let Some(plan) = replace_table { + let changes = plan.actor_changes(); + let CommandActorChanges::Actor { + to_add: to_add_plan, + to_remove, + } = changes + else { + unreachable!("replace plan should have actor changes") + }; + to_add.extend(to_add_plan); + CommandActorChanges::Actor { to_add, to_remove } + } else { + CommandActorChanges::Actor { + to_add, + to_remove: Default::default(), + } } } Command::CancelStreamingJob(table_fragments) => CommandActorChanges::Actor { @@ -233,31 +273,7 @@ impl Command { CommandActorChanges::Actor { to_add, to_remove } } - Command::ReplaceTable(ReplaceTablePlan { - old_table_fragments, - new_table_fragments, - .. - }) => { - let worker_actors = new_table_fragments.worker_actor_ids(); - let barrier_inject_actors: &HashSet<_> = &new_table_fragments - .barrier_inject_actor_ids() - .into_iter() - .collect(); - let to_add = worker_actors - .into_iter() - .flat_map(|(node_id, actors)| { - actors.into_iter().map(move |actor_id| ActorDesc { - id: actor_id, - node_id, - is_injectable: barrier_inject_actors.contains(&actor_id), - }) - }) - .collect_vec(); - CommandActorChanges::Actor { - to_add, - to_remove: old_table_fragments.actor_ids().into_iter().collect(), - } - } + Command::ReplaceTable(plan) => plan.actor_changes(), Command::SourceSplitAssignment(_) => CommandActorChanges::None, Command::Throttle(_) => CommandActorChanges::None, } diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index eb3dc162a12fc..2477f7b370dfb 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -1306,7 +1306,7 @@ impl FragmentManager { if let Some(node) = downstream_actor.nodes.as_mut() { update_merge_node_upstream( node, - &fragment_id, + fragment_id, &removed_actor_ids, &added_actor_ids, );