diff --git a/core-client/transports/src/transports/duplex.rs b/core-client/transports/src/transports/duplex.rs index 28751f64..be9c85a1 100644 --- a/core-client/transports/src/transports/duplex.rs +++ b/core-client/transports/src/transports/duplex.rs @@ -158,6 +158,7 @@ where // Handle stream. // Reads from stream and queues to incoming queue. log::debug!("handle stream"); + let mut stream_eof = false; loop { let response_str = match self.stream.as_mut().poll_next(cx) { Poll::Ready(Some(response_str)) => response_str, @@ -166,7 +167,9 @@ where // can be shutdown. Reopening closed connections must // be handled by the transport. debug!("connection closed"); - return Poll::Ready(Ok(())); + // We still have to process the incoming queue. + stream_eof = true; + break; } Poll::Pending => break, }; @@ -269,6 +272,11 @@ where } } + // Input stream handling is done, the future is ready. + if stream_eof { + return Poll::Ready(Ok(())); + } + // Handle outgoing queue. // Writes queued messages to sink. log::debug!("handle outgoing");