Skip to content

Commit

Permalink
fix early return
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Nov 22, 2024
1 parent e84eb89 commit 119a250
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,12 @@ impl ControlStreamManager {
if self.nodes.is_empty() {
return Poll::Pending;
}
let result: Poll<(WorkerId, MetaResult<_>)> = {
let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
{
for (worker_id, node) in &mut self.nodes {
match node.handle.response_stream.poll_next_unpin(cx) {
Poll::Ready(result) => {
return Poll::Ready((
poll_result = Poll::Ready((
*worker_id,
result
.ok_or_else(|| anyhow!("end of stream").into())
Expand All @@ -204,26 +205,26 @@ impl ControlStreamManager {
resp => Ok(resp),
}
})
}),
})
));
break;
}
Poll::Pending => {
continue;
}
}
}
Poll::Pending
};

if let Poll::Ready((worker_id, Err(err))) = &result {
if let Poll::Ready((worker_id, Err(err))) = &poll_result {
let node = self
.nodes
.remove(worker_id)
.expect("should exist when get shutdown resp");
warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream");
}

result
poll_result
}

pub(super) async fn next_response(
Expand Down

0 comments on commit 119a250

Please sign in to comment.