Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 26, 2024
1 parent a5f7033 commit 7a4d6d8
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,6 @@ impl CheckpointControl {
{
assert!(node.state.node_to_collect.remove(&worker_id));
node.state.resps.push(resp);
if node.state.node_to_collect.is_empty() {
// change state to complete, and wait for nodes with the smaller epoch to commit
let wait_commit_timer = self
.context
.metrics
.barrier_wait_commit_latency
.start_timer();
node.wait_commit_timer = Some(wait_commit_timer);
}
} else {
panic!(
"collect barrier on non-existing barrier: {}, {}",
Expand All @@ -299,7 +290,20 @@ impl CheckpointControl {
.iter()
.position(|x| x.state.is_inflight())
.unwrap_or(self.command_ctx_queue.len());
let complete_nodes = self.command_ctx_queue.drain(..index).collect_vec();
let complete_nodes = self
.command_ctx_queue
.drain(..index)
.map(|mut node| {
// change state to complete, and wait for nodes with the smaller epoch to commit
let wait_commit_timer = self
.context
.metrics
.barrier_wait_commit_latency
.start_timer();
node.wait_commit_timer = Some(wait_commit_timer);
node
})
.collect_vec();
complete_nodes
}

Expand Down

0 comments on commit 7a4d6d8

Please sign in to comment.