From 94ba3e2d2df21abc9da98b80298f2f232d86132c Mon Sep 17 00:00:00 2001 From: Valeriy Zamaraiev Date: Sun, 22 May 2022 10:36:36 +0200 Subject: [PATCH 1/2] Fix skipping incoming queue processing on EOF from the incoming stream. Without the fix, when the incoming stream contains e.g. two frames: one being a reponse, the other EOF, the first one will be not be processed. --- core-client/transports/src/transports/duplex.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core-client/transports/src/transports/duplex.rs b/core-client/transports/src/transports/duplex.rs index 28751f64..40fc58cc 100644 --- a/core-client/transports/src/transports/duplex.rs +++ b/core-client/transports/src/transports/duplex.rs @@ -158,6 +158,7 @@ where // Handle stream. // Reads from stream and queues to incoming queue. log::debug!("handle stream"); + let mut stream_eof = false; loop { let response_str = match self.stream.as_mut().poll_next(cx) { Poll::Ready(Some(response_str)) => response_str, @@ -166,7 +167,9 @@ where // can be shutdown. Reopening closed connections must // be handled by the transport. debug!("connection closed"); - return Poll::Ready(Ok(())); + // We still have to process the incoming queue. + stream_eof = true; + break; } Poll::Pending => break, }; @@ -304,6 +307,7 @@ where && self.pending_requests.is_empty() && self.subscriptions.is_empty() && sink_empty + && stream_eof { log::debug!("close"); Poll::Ready(Ok(())) From c1e7058f33e3e4b3149bd92fb42a5443526d0b7e Mon Sep 17 00:00:00 2001 From: Valeriy Zamaraiev Date: Tue, 15 Nov 2022 19:12:36 +0100 Subject: [PATCH 2/2] Handle eof on a closed socket more carefully: only handle incoming messages, then quit --- core-client/transports/src/transports/duplex.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core-client/transports/src/transports/duplex.rs b/core-client/transports/src/transports/duplex.rs index 40fc58cc..be9c85a1 100644 --- a/core-client/transports/src/transports/duplex.rs +++ b/core-client/transports/src/transports/duplex.rs @@ -272,6 +272,11 @@ where } } + // Input stream handling is done, the future is ready. + if stream_eof { + return Poll::Ready(Ok(())); + } + // Handle outgoing queue. // Writes queued messages to sink. log::debug!("handle outgoing"); @@ -307,7 +312,6 @@ where && self.pending_requests.is_empty() && self.subscriptions.is_empty() && sink_empty - && stream_eof { log::debug!("close"); Poll::Ready(Ok(()))