From 61003c53cec1b5b37f8c05231e44efea0077ad07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Wed, 28 Feb 2024 22:59:14 +0000 Subject: [PATCH 1/5] feat: switch pending_frames VecDequeue for an Option to bound it --- test-harness/tests/poll_api.rs | 2 +- yamux/src/connection.rs | 75 +++++++++++++++++---------------- yamux/src/connection/closing.rs | 17 ++++---- 3 files changed, 47 insertions(+), 47 deletions(-) diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index 4d21496e..7f23a03b 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -57,7 +57,7 @@ fn concurrent_streams() { const PAYLOAD_SIZE: usize = 128 * 1024; let data = Msg(vec![0x42; PAYLOAD_SIZE]); - let n_streams = 1000; + let n_streams = 512; let mut cfg = Config::default(); cfg.set_split_send_size(PAYLOAD_SIZE); // Use a large frame size to speed up the test. diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index 27beb28e..86a7fd2a 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -31,7 +31,6 @@ use futures::stream::SelectAll; use futures::{channel::mpsc, future::Either, prelude::*, sink::SinkExt, stream::Fuse}; use nohash_hasher::IntMap; use parking_lot::Mutex; -use std::collections::VecDeque; use std::task::{Context, Waker}; use std::{fmt, sync::Arc, task::Poll}; @@ -286,7 +285,7 @@ struct Active { stream_receivers: SelectAll>>, no_streams_waker: Option, - pending_frames: VecDeque>, + pending_frame: Option>, new_outbound_stream_waker: Option, rtt: rtt::Rtt, @@ -360,7 +359,7 @@ impl Active { Mode::Client => 1, Mode::Server => 2, }, - pending_frames: VecDeque::default(), + pending_frame: None, new_outbound_stream_waker: None, rtt: rtt::Rtt::new(), accumulated_max_stream_windows: Default::default(), @@ -369,7 +368,7 @@ impl Active { /// Gracefully close the connection to the remote. fn close(self) -> Closing { - Closing::new(self.stream_receivers, self.pending_frames, self.socket) + Closing::new(self.stream_receivers, self.pending_frame, self.socket) } /// Cleanup all our resources. @@ -392,7 +391,7 @@ impl Active { continue; } - if let Some(frame) = self.pending_frames.pop_front() { + if let Some(frame) = self.pending_frame.take() { self.socket.start_send_unpin(frame)?; continue; } @@ -403,36 +402,38 @@ impl Active { Poll::Pending => {} } - match self.stream_receivers.poll_next_unpin(cx) { - Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => { - self.on_send_frame(frame); - continue; - } - Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => { - self.on_close_stream(id, ack); - continue; - } - Poll::Ready(Some((id, None))) => { - self.on_drop_stream(id); - continue; - } - Poll::Ready(None) => { - self.no_streams_waker = Some(cx.waker().clone()); + if self.pending_frame.is_none() { + match self.socket.poll_next_unpin(cx) { + Poll::Ready(Some(frame)) => { + if let Some(stream) = self.on_frame(frame?)? { + return Poll::Ready(Ok(stream)); + } + continue; + } + Poll::Ready(None) => { + return Poll::Ready(Err(ConnectionError::Closed)); + } + Poll::Pending => {} } - Poll::Pending => {} - } - match self.socket.poll_next_unpin(cx) { - Poll::Ready(Some(frame)) => { - if let Some(stream) = self.on_frame(frame?)? { - return Poll::Ready(Ok(stream)); + match self.stream_receivers.poll_next_unpin(cx) { + Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => { + self.on_send_frame(frame); + continue; } - continue; - } - Poll::Ready(None) => { - return Poll::Ready(Err(ConnectionError::Closed)); + Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => { + self.on_close_stream(id, ack); + continue; + } + Poll::Ready(Some((id, None))) => { + self.on_drop_stream(id); + continue; + } + Poll::Ready(None) => { + self.no_streams_waker = Some(cx.waker().clone()); + } + Poll::Pending => {} } - Poll::Pending => {} } // If we make it this far, at least one of the above must have registered a waker. @@ -470,13 +471,13 @@ impl Active { frame.header().stream_id(), frame.header() ); - self.pending_frames.push_back(frame.into()); + self.pending_frame.replace(frame.into()); } fn on_close_stream(&mut self, id: StreamId, ack: bool) { log::trace!("{}/{}: sending close", self.id, id); - self.pending_frames - .push_back(Frame::close_stream(id, ack).into()); + self.pending_frame + .replace(Frame::close_stream(id, ack).into()); } fn on_drop_stream(&mut self, stream_id: StreamId) { @@ -527,7 +528,7 @@ impl Active { }; if let Some(f) = frame { log::trace!("{}/{}: sending: {}", self.id, stream_id, f.header()); - self.pending_frames.push_back(f.into()); + self.pending_frame.replace(f.into()); } } @@ -568,11 +569,11 @@ impl Active { } Action::Ping(f) => { log::trace!("{}/{}: pong", self.id, f.header().stream_id()); - self.pending_frames.push_back(f.into()); + self.pending_frame.replace(f.into()); } Action::Terminate(f) => { log::trace!("{}: sending term", self.id); - self.pending_frames.push_back(f.into()); + self.pending_frame.replace(f.into()); } } diff --git a/yamux/src/connection/closing.rs b/yamux/src/connection/closing.rs index b02c465e..3d7a50d8 100644 --- a/yamux/src/connection/closing.rs +++ b/yamux/src/connection/closing.rs @@ -6,7 +6,6 @@ use crate::{frame, StreamId}; use futures::channel::mpsc; use futures::stream::{Fuse, SelectAll}; use futures::{ready, AsyncRead, AsyncWrite, SinkExt, StreamExt}; -use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -16,7 +15,7 @@ use std::task::{Context, Poll}; pub struct Closing { state: State, stream_receivers: SelectAll>>, - pending_frames: VecDeque>, + pending_frame: Option>, socket: Fuse>, } @@ -26,13 +25,13 @@ where { pub(crate) fn new( stream_receivers: SelectAll>>, - pending_frames: VecDeque>, + pending_frame: Option>, socket: Fuse>, ) -> Self { Self { state: State::ClosingStreamReceiver, stream_receivers, - pending_frames, + pending_frame, socket, } } @@ -59,16 +58,16 @@ where State::DrainingStreamReceiver => { match this.stream_receivers.poll_next_unpin(cx) { Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => { - this.pending_frames.push_back(frame.into()) + this.pending_frame.replace(frame.into()); } Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => { - this.pending_frames - .push_back(Frame::close_stream(id, ack).into()); + this.pending_frame + .replace(Frame::close_stream(id, ack).into()); } Poll::Ready(Some((_, None))) => {} Poll::Pending | Poll::Ready(None) => { // No more frames from streams, append `Term` frame and flush them all. - this.pending_frames.push_back(Frame::term().into()); + this.pending_frame.replace(Frame::term().into()); this.state = State::FlushingPendingFrames; continue; } @@ -77,7 +76,7 @@ where State::FlushingPendingFrames => { ready!(this.socket.poll_ready_unpin(cx))?; - match this.pending_frames.pop_front() { + match this.pending_frame.take() { Some(frame) => this.socket.start_send_unpin(frame)?, None => this.state = State::ClosingSocket, } From 297e9eba1f30ded3a2b4bb46c8bd8cf2fb4471db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 4 Mar 2024 12:18:10 +0000 Subject: [PATCH 2/5] set pending_frame on main poll, - so that it's clear we are replacing a None `pending_frame` change the order of the closing state to allow flushing the pending frames first, and not override a pending frame with a closing one from the receiver streams --- yamux/src/connection.rs | 76 ++++++++++++++------------------- yamux/src/connection/closing.rs | 6 +-- 2 files changed, 35 insertions(+), 47 deletions(-) diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index 86a7fd2a..f2b389ec 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -405,8 +405,20 @@ impl Active { if self.pending_frame.is_none() { match self.socket.poll_next_unpin(cx) { Poll::Ready(Some(frame)) => { - if let Some(stream) = self.on_frame(frame?)? { - return Poll::Ready(Ok(stream)); + match self.on_frame(frame?)? { + Action::None => {} + Action::New(stream) => { + log::trace!("{}: new inbound {} of {}", self.id, stream, self); + return Poll::Ready(Ok(stream)); + } + Action::Ping(f) => { + log::trace!("{}/{}: pong", self.id, f.header().stream_id()); + self.pending_frame.replace(f.into()); + } + Action::Terminate(f) => { + log::trace!("{}: sending term", self.id); + self.pending_frame.replace(f.into()); + } } continue; } @@ -418,15 +430,26 @@ impl Active { match self.stream_receivers.poll_next_unpin(cx) { Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => { - self.on_send_frame(frame); + log::trace!( + "{}/{}: sending: {}", + self.id, + frame.header().stream_id(), + frame.header() + ); + self.pending_frame.replace(frame.into()); continue; } Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => { - self.on_close_stream(id, ack); + log::trace!("{}/{}: sending close", self.id, id); + self.pending_frame + .replace(Frame::close_stream(id, ack).into()); continue; } Poll::Ready(Some((id, None))) => { - self.on_drop_stream(id); + if let Some(frame) = self.on_drop_stream(id) { + log::trace!("{}/{}: sending: {}", self.id, id, frame.header()); + self.pending_frame.replace(frame); + }; continue; } Poll::Ready(None) => { @@ -464,23 +487,7 @@ impl Active { Poll::Ready(Ok(stream)) } - fn on_send_frame(&mut self, frame: Frame>) { - log::trace!( - "{}/{}: sending: {}", - self.id, - frame.header().stream_id(), - frame.header() - ); - self.pending_frame.replace(frame.into()); - } - - fn on_close_stream(&mut self, id: StreamId, ack: bool) { - log::trace!("{}/{}: sending close", self.id, id); - self.pending_frame - .replace(Frame::close_stream(id, ack).into()); - } - - fn on_drop_stream(&mut self, stream_id: StreamId) { + fn on_drop_stream(&mut self, stream_id: StreamId) -> Option> { let s = self.streams.remove(&stream_id).expect("stream not found"); log::trace!("{}: removing dropped stream {}", self.id, stream_id); @@ -526,10 +533,7 @@ impl Active { } frame }; - if let Some(f) = frame { - log::trace!("{}/{}: sending: {}", self.id, stream_id, f.header()); - self.pending_frame.replace(f.into()); - } + frame.map(Into::into) } /// Process the result of reading from the socket. @@ -538,7 +542,7 @@ impl Active { /// and return a corresponding error, which terminates the connection. /// Otherwise we process the frame and potentially return a new `Stream` /// if one was opened by the remote. - fn on_frame(&mut self, frame: Frame<()>) -> Result> { + fn on_frame(&mut self, frame: Frame<()>) -> Result { log::trace!("{}: received: {}", self.id, frame.header()); if frame.header().flags().contains(header::ACK) @@ -561,23 +565,7 @@ impl Active { Tag::Ping => self.on_ping(&frame.into_ping()), Tag::GoAway => return Err(ConnectionError::Closed), }; - match action { - Action::None => {} - Action::New(stream) => { - log::trace!("{}: new inbound {} of {}", self.id, stream, self); - return Ok(Some(stream)); - } - Action::Ping(f) => { - log::trace!("{}/{}: pong", self.id, f.header().stream_id()); - self.pending_frame.replace(f.into()); - } - Action::Terminate(f) => { - log::trace!("{}: sending term", self.id); - self.pending_frame.replace(f.into()); - } - } - - Ok(None) + Ok(action) } fn on_data(&mut self, frame: Frame) -> Action { diff --git a/yamux/src/connection/closing.rs b/yamux/src/connection/closing.rs index 3d7a50d8..9d7cbc61 100644 --- a/yamux/src/connection/closing.rs +++ b/yamux/src/connection/closing.rs @@ -29,7 +29,7 @@ where socket: Fuse>, ) -> Self { Self { - state: State::ClosingStreamReceiver, + state: State::FlushingPendingFrames, stream_receivers, pending_frame, socket, @@ -68,7 +68,7 @@ where Poll::Pending | Poll::Ready(None) => { // No more frames from streams, append `Term` frame and flush them all. this.pending_frame.replace(Frame::term().into()); - this.state = State::FlushingPendingFrames; + this.state = State::ClosingSocket; continue; } } @@ -78,7 +78,7 @@ where match this.pending_frame.take() { Some(frame) => this.socket.start_send_unpin(frame)?, - None => this.state = State::ClosingSocket, + None => this.state = State::ClosingStreamReceiver, } } State::ClosingSocket => { From 286728026d8ec15cb4120028d3fabdbd35ebf3ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Wed, 13 Mar 2024 14:24:56 +0000 Subject: [PATCH 3/5] use pending_read_frame and pending_write_frame separately --- yamux/src/connection.rs | 38 +++++++++++++++++++++++---------- yamux/src/connection/closing.rs | 17 ++++++++------- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index f2b389ec..b5c5e07b 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -31,6 +31,7 @@ use futures::stream::SelectAll; use futures::{channel::mpsc, future::Either, prelude::*, sink::SinkExt, stream::Fuse}; use nohash_hasher::IntMap; use parking_lot::Mutex; +use std::collections::VecDeque; use std::task::{Context, Waker}; use std::{fmt, sync::Arc, task::Poll}; @@ -285,7 +286,8 @@ struct Active { stream_receivers: SelectAll>>, no_streams_waker: Option, - pending_frame: Option>, + pending_read_frame: Option>, + pending_write_frame: Option>, new_outbound_stream_waker: Option, rtt: rtt::Rtt, @@ -359,7 +361,8 @@ impl Active { Mode::Client => 1, Mode::Server => 2, }, - pending_frame: None, + pending_read_frame: None, + pending_write_frame: None, new_outbound_stream_waker: None, rtt: rtt::Rtt::new(), accumulated_max_stream_windows: Default::default(), @@ -368,7 +371,12 @@ impl Active { /// Gracefully close the connection to the remote. fn close(self) -> Closing { - Closing::new(self.stream_receivers, self.pending_frame, self.socket) + let pending_frames = self + .pending_read_frame + .into_iter() + .chain(self.pending_write_frame) + .collect::>>(); + Closing::new(self.stream_receivers, pending_frames, self.socket) } /// Cleanup all our resources. @@ -391,7 +399,14 @@ impl Active { continue; } - if let Some(frame) = self.pending_frame.take() { + // Privilege pending `Pong` and `GoAway` `Frame`s + // over `Frame`s from the receivers. + if let Some(frame) = self.pending_read_frame.take() { + self.socket.start_send_unpin(frame)?; + continue; + } + + if let Some(frame) = self.pending_write_frame.take() { self.socket.start_send_unpin(frame)?; continue; } @@ -402,7 +417,7 @@ impl Active { Poll::Pending => {} } - if self.pending_frame.is_none() { + if self.pending_read_frame.is_none() { match self.socket.poll_next_unpin(cx) { Poll::Ready(Some(frame)) => { match self.on_frame(frame?)? { @@ -413,11 +428,11 @@ impl Active { } Action::Ping(f) => { log::trace!("{}/{}: pong", self.id, f.header().stream_id()); - self.pending_frame.replace(f.into()); + self.pending_read_frame.replace(f.into()); } Action::Terminate(f) => { log::trace!("{}: sending term", self.id); - self.pending_frame.replace(f.into()); + self.pending_read_frame.replace(f.into()); } } continue; @@ -427,7 +442,8 @@ impl Active { } Poll::Pending => {} } - + } + if self.pending_write_frame.is_none() { match self.stream_receivers.poll_next_unpin(cx) { Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => { log::trace!( @@ -436,19 +452,19 @@ impl Active { frame.header().stream_id(), frame.header() ); - self.pending_frame.replace(frame.into()); + self.pending_write_frame.replace(frame.into()); continue; } Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => { log::trace!("{}/{}: sending close", self.id, id); - self.pending_frame + self.pending_write_frame .replace(Frame::close_stream(id, ack).into()); continue; } Poll::Ready(Some((id, None))) => { if let Some(frame) = self.on_drop_stream(id) { log::trace!("{}/{}: sending: {}", self.id, id, frame.header()); - self.pending_frame.replace(frame); + self.pending_write_frame.replace(frame); }; continue; } diff --git a/yamux/src/connection/closing.rs b/yamux/src/connection/closing.rs index 9d7cbc61..1c17cae9 100644 --- a/yamux/src/connection/closing.rs +++ b/yamux/src/connection/closing.rs @@ -6,6 +6,7 @@ use crate::{frame, StreamId}; use futures::channel::mpsc; use futures::stream::{Fuse, SelectAll}; use futures::{ready, AsyncRead, AsyncWrite, SinkExt, StreamExt}; +use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -15,7 +16,7 @@ use std::task::{Context, Poll}; pub struct Closing { state: State, stream_receivers: SelectAll>>, - pending_frame: Option>, + pending_frames: VecDeque>, socket: Fuse>, } @@ -25,13 +26,13 @@ where { pub(crate) fn new( stream_receivers: SelectAll>>, - pending_frame: Option>, + pending_frames: VecDeque>, socket: Fuse>, ) -> Self { Self { state: State::FlushingPendingFrames, stream_receivers, - pending_frame, + pending_frames, socket, } } @@ -58,16 +59,16 @@ where State::DrainingStreamReceiver => { match this.stream_receivers.poll_next_unpin(cx) { Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => { - this.pending_frame.replace(frame.into()); + this.pending_frames.push_back(frame.into()); } Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => { - this.pending_frame - .replace(Frame::close_stream(id, ack).into()); + this.pending_frames + .push_back(Frame::close_stream(id, ack).into()); } Poll::Ready(Some((_, None))) => {} Poll::Pending | Poll::Ready(None) => { // No more frames from streams, append `Term` frame and flush them all. - this.pending_frame.replace(Frame::term().into()); + this.pending_frames.push_back(Frame::term().into()); this.state = State::ClosingSocket; continue; } @@ -76,7 +77,7 @@ where State::FlushingPendingFrames => { ready!(this.socket.poll_ready_unpin(cx))?; - match this.pending_frame.take() { + match this.pending_frames.pop_front() { Some(frame) => this.socket.start_send_unpin(frame)?, None => this.state = State::ClosingStreamReceiver, } From e15486476664eeab67198b998f3ab522e005e7a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Fri, 15 Mar 2024 14:47:35 +0000 Subject: [PATCH 4/5] address review --- yamux/src/connection.rs | 64 ++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index b5c5e07b..1c87336a 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -401,12 +401,11 @@ impl Active { // Privilege pending `Pong` and `GoAway` `Frame`s // over `Frame`s from the receivers. - if let Some(frame) = self.pending_read_frame.take() { - self.socket.start_send_unpin(frame)?; - continue; - } - - if let Some(frame) = self.pending_write_frame.take() { + if let Some(frame) = self + .pending_read_frame + .take() + .or_else(|| self.pending_write_frame.take()) + { self.socket.start_send_unpin(frame)?; continue; } @@ -417,32 +416,6 @@ impl Active { Poll::Pending => {} } - if self.pending_read_frame.is_none() { - match self.socket.poll_next_unpin(cx) { - Poll::Ready(Some(frame)) => { - match self.on_frame(frame?)? { - Action::None => {} - Action::New(stream) => { - log::trace!("{}: new inbound {} of {}", self.id, stream, self); - return Poll::Ready(Ok(stream)); - } - Action::Ping(f) => { - log::trace!("{}/{}: pong", self.id, f.header().stream_id()); - self.pending_read_frame.replace(f.into()); - } - Action::Terminate(f) => { - log::trace!("{}: sending term", self.id); - self.pending_read_frame.replace(f.into()); - } - } - continue; - } - Poll::Ready(None) => { - return Poll::Ready(Err(ConnectionError::Closed)); - } - Poll::Pending => {} - } - } if self.pending_write_frame.is_none() { match self.stream_receivers.poll_next_unpin(cx) { Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => { @@ -475,6 +448,33 @@ impl Active { } } + if self.pending_read_frame.is_none() { + match self.socket.poll_next_unpin(cx) { + Poll::Ready(Some(frame)) => { + match self.on_frame(frame?)? { + Action::None => {} + Action::New(stream) => { + log::trace!("{}: new inbound {} of {}", self.id, stream, self); + return Poll::Ready(Ok(stream)); + } + Action::Ping(f) => { + log::trace!("{}/{}: pong", self.id, f.header().stream_id()); + self.pending_read_frame.replace(f.into()); + } + Action::Terminate(f) => { + log::trace!("{}: sending term", self.id); + self.pending_read_frame.replace(f.into()); + } + } + continue; + } + Poll::Ready(None) => { + return Poll::Ready(Err(ConnectionError::Closed)); + } + Poll::Pending => {} + } + } + // If we make it this far, at least one of the above must have registered a waker. return Poll::Pending; } From af8f6935c1aca23d8efc59743d665d7bc5ffe414 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Fri, 22 Mar 2024 17:57:30 +0000 Subject: [PATCH 5/5] address new review --- yamux/src/connection/closing.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/yamux/src/connection/closing.rs b/yamux/src/connection/closing.rs index 1c17cae9..8ef9d8a5 100644 --- a/yamux/src/connection/closing.rs +++ b/yamux/src/connection/closing.rs @@ -49,6 +49,14 @@ where loop { match this.state { + State::FlushingPendingFrames => { + ready!(this.socket.poll_ready_unpin(cx))?; + + match this.pending_frames.pop_front() { + Some(frame) => this.socket.start_send_unpin(frame)?, + None => this.state = State::ClosingStreamReceiver, + } + } State::ClosingStreamReceiver => { for stream in this.stream_receivers.iter_mut() { stream.inner_mut().close(); @@ -74,14 +82,6 @@ where } } } - State::FlushingPendingFrames => { - ready!(this.socket.poll_ready_unpin(cx))?; - - match this.pending_frames.pop_front() { - Some(frame) => this.socket.start_send_unpin(frame)?, - None => this.state = State::ClosingStreamReceiver, - } - } State::ClosingSocket => { ready!(this.socket.poll_close_unpin(cx))?; @@ -93,8 +93,8 @@ where } enum State { + FlushingPendingFrames, ClosingStreamReceiver, DrainingStreamReceiver, - FlushingPendingFrames, ClosingSocket, }