Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jun 14, 2024
1 parent 7ff7132 commit a2ae282
Showing 1 changed file with 25 additions and 25 deletions.
50 changes: 25 additions & 25 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,32 +103,32 @@ impl ReplaceTablePlan {
fn actor_changes(&self) -> CommandActorChanges {
let mut fragment_changes = HashMap::new();
for fragment in self.new_table_fragments.fragments.values() {
let fragment_change = CommandFragmentChanges::NewFragment {
new_actors: fragment
.actors
.iter()
.map(|actor| {
(
actor.actor_id,
self.new_table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.get_parallel_unit()
.expect("should set")
.worker_node_id,
)
})
.collect(),
table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| TableId::new(*table_id))
.collect(),
is_injectable: TableFragments::is_injectable(fragment.fragment_type_mask),
};
assert!(fragment_changes
.insert(
fragment.fragment_id,
CommandFragmentChanges::NewFragment {
new_actors: fragment
.actors
.iter()
.map(|actor| (
actor.actor_id,
self.new_table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.get_parallel_unit()
.expect("should set")
.worker_node_id
))
.collect(),
table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| TableId::new(*table_id))
.collect(),
is_injectable: TableFragments::is_injectable(fragment.fragment_type_mask),
}
)
.insert(fragment.fragment_id, fragment_change)
.is_none());
}
for fragment in self.old_table_fragments.fragments.values() {
Expand Down

0 comments on commit a2ae282

Please sign in to comment.