diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index d21370b4bbaf9..5c55035bd2704 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -179,11 +179,12 @@ impl ControlStreamManager { if self.nodes.is_empty() { return Poll::Pending; } - let result: Poll<(WorkerId, MetaResult<_>)> = { + let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending; + { for (worker_id, node) in &mut self.nodes { match node.handle.response_stream.poll_next_unpin(cx) { Poll::Ready(result) => { - return Poll::Ready(( + poll_result = Poll::Ready(( *worker_id, result .ok_or_else(|| anyhow!("end of stream").into()) @@ -204,18 +205,18 @@ impl ControlStreamManager { resp => Ok(resp), } }) - }), + }) )); + break; } Poll::Pending => { continue; } } } - Poll::Pending }; - if let Poll::Ready((worker_id, Err(err))) = &result { + if let Poll::Ready((worker_id, Err(err))) = &poll_result { let node = self .nodes .remove(worker_id) @@ -223,7 +224,7 @@ impl ControlStreamManager { warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream"); } - result + poll_result } pub(super) async fn next_response(