diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 43fdff08a28ff..a75f34c405e60 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -103,32 +103,32 @@ impl ReplaceTablePlan { fn actor_changes(&self) -> CommandActorChanges { let mut fragment_changes = HashMap::new(); for fragment in self.new_table_fragments.fragments.values() { + let fragment_change = CommandFragmentChanges::NewFragment { + new_actors: fragment + .actors + .iter() + .map(|actor| { + ( + actor.actor_id, + self.new_table_fragments + .actor_status + .get(&actor.actor_id) + .expect("should exist") + .get_parallel_unit() + .expect("should set") + .worker_node_id, + ) + }) + .collect(), + table_ids: fragment + .state_table_ids + .iter() + .map(|table_id| TableId::new(*table_id)) + .collect(), + is_injectable: TableFragments::is_injectable(fragment.fragment_type_mask), + }; assert!(fragment_changes - .insert( - fragment.fragment_id, - CommandFragmentChanges::NewFragment { - new_actors: fragment - .actors - .iter() - .map(|actor| ( - actor.actor_id, - self.new_table_fragments - .actor_status - .get(&actor.actor_id) - .expect("should exist") - .get_parallel_unit() - .expect("should set") - .worker_node_id - )) - .collect(), - table_ids: fragment - .state_table_ids - .iter() - .map(|table_id| TableId::new(*table_id)) - .collect(), - is_injectable: TableFragments::is_injectable(fragment.fragment_type_mask), - } - ) + .insert(fragment.fragment_id, fragment_change) .is_none()); } for fragment in self.old_table_fragments.fragments.values() {