From c255cbb768966ab0a2817a7793d3c3d6e6130f35 Mon Sep 17 00:00:00 2001 From: turuslan Date: Tue, 15 Oct 2024 22:41:01 +0500 Subject: [PATCH 1/4] send pending frames after collecting them --- yamux/src/connection/closing.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/yamux/src/connection/closing.rs b/yamux/src/connection/closing.rs index 8ef9d8a..4d581cd 100644 --- a/yamux/src/connection/closing.rs +++ b/yamux/src/connection/closing.rs @@ -30,7 +30,7 @@ where socket: Fuse>, ) -> Self { Self { - state: State::FlushingPendingFrames, + state: State::ClosingStreamReceiver, stream_receivers, pending_frames, socket, @@ -49,14 +49,6 @@ where loop { match this.state { - State::FlushingPendingFrames => { - ready!(this.socket.poll_ready_unpin(cx))?; - - match this.pending_frames.pop_front() { - Some(frame) => this.socket.start_send_unpin(frame)?, - None => this.state = State::ClosingStreamReceiver, - } - } State::ClosingStreamReceiver => { for stream in this.stream_receivers.iter_mut() { stream.inner_mut().close(); @@ -77,11 +69,19 @@ where Poll::Pending | Poll::Ready(None) => { // No more frames from streams, append `Term` frame and flush them all. this.pending_frames.push_back(Frame::term().into()); - this.state = State::ClosingSocket; + this.state = State::FlushingPendingFrames; continue; } } } + State::FlushingPendingFrames => { + ready!(this.socket.poll_ready_unpin(cx))?; + + match this.pending_frames.pop_front() { + Some(frame) => this.socket.start_send_unpin(frame)?, + None => this.state = State::ClosingSocket, + } + } State::ClosingSocket => { ready!(this.socket.poll_close_unpin(cx))?; @@ -93,8 +93,8 @@ where } enum State { - FlushingPendingFrames, ClosingStreamReceiver, DrainingStreamReceiver, + FlushingPendingFrames, ClosingSocket, } From b376de09e0f82992fc330b538549b0548d99db21 Mon Sep 17 00:00:00 2001 From: turuslan Date: Tue, 15 Oct 2024 22:41:18 +0500 Subject: [PATCH 2/4] test --- yamux/src/connection/closing.rs | 99 +++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/yamux/src/connection/closing.rs b/yamux/src/connection/closing.rs index 4d581cd..41fc815 100644 --- a/yamux/src/connection/closing.rs +++ b/yamux/src/connection/closing.rs @@ -98,3 +98,102 @@ enum State { FlushingPendingFrames, ClosingSocket, } + +#[cfg(test)] +mod tests { + use super::*; + use futures::future::poll_fn; + use futures::FutureExt; + + struct Socket { + written: Vec, + closed: bool, + } + impl AsyncRead for Socket { + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context<'_>, + _: &mut [u8], + ) -> Poll> { + todo!() + } + } + impl AsyncWrite for Socket { + fn poll_write( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + assert!(!self.closed); + self.written.extend_from_slice(buf); + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + todo!() + } + + fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + assert!(!self.closed); + self.closed = true; + Poll::Ready(Ok(())) + } + } + + #[test] + fn pending_frames() { + let frame_pending = Frame::data(StreamId::new(1), vec![2]).unwrap().into(); + let frame_data = Frame::data(StreamId::new(3), vec![4]).unwrap().into(); + let frame_close = Frame::close_stream(StreamId::new(5), false).into(); + let frame_close_ack = Frame::close_stream(StreamId::new(6), true).into(); + let frame_term = Frame::term().into(); + fn encode(buf: &mut Vec, frame: &Frame<()>) { + buf.extend_from_slice(&frame::header::encode(frame.header())); + if frame.header().tag() == frame::header::Tag::Data { + buf.extend_from_slice(frame.clone().into_data().body()); + } + } + let mut expected_written = vec![]; + encode(&mut expected_written, &frame_pending); + encode(&mut expected_written, &frame_data); + encode(&mut expected_written, &frame_close); + encode(&mut expected_written, &frame_close_ack); + encode(&mut expected_written, &frame_term); + + let receiver = |frame: &Frame<_>, command: StreamCommand| { + TaggedStream::new(frame.header().stream_id(), { + let (mut tx, rx) = mpsc::channel(1); + tx.try_send(command).unwrap(); + rx + }) + }; + + let mut stream_receivers: SelectAll<_> = Default::default(); + stream_receivers.push(receiver( + &frame_data, + StreamCommand::SendFrame(frame_data.clone().into_data().left()), + )); + stream_receivers.push(receiver( + &frame_close, + StreamCommand::CloseStream { ack: false }, + )); + stream_receivers.push(receiver( + &frame_close_ack, + StreamCommand::CloseStream { ack: true }, + )); + let pending_frames = vec![frame_pending.clone().into()]; + let mut socket = Socket { + written: vec![], + closed: false, + }; + let mut closing = Closing::new( + stream_receivers, + pending_frames.into(), + frame::Io::new(crate::connection::Id(0), &mut socket).fuse(), + ); + futures::executor::block_on(async { poll_fn(|cx| closing.poll_unpin(cx)).await.unwrap() }); + assert!(closing.pending_frames.is_empty()); + assert!(socket.closed); + assert_eq!(socket.written, expected_written); + } +} From 8da581b89a94241815b5b65100258565eb8b8c62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Wed, 23 Oct 2024 17:12:20 +0100 Subject: [PATCH 3/4] Apply suggestions from code review --- yamux/src/connection/closing.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/yamux/src/connection/closing.rs b/yamux/src/connection/closing.rs index 41fc815..d4f8636 100644 --- a/yamux/src/connection/closing.rs +++ b/yamux/src/connection/closing.rs @@ -115,7 +115,7 @@ mod tests { _: &mut Context<'_>, _: &mut [u8], ) -> Poll> { - todo!() + unimplemented!() } } impl AsyncWrite for Socket { @@ -130,7 +130,7 @@ mod tests { } fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - todo!() + uniplemented!() } fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { @@ -181,7 +181,7 @@ mod tests { &frame_close_ack, StreamCommand::CloseStream { ack: true }, )); - let pending_frames = vec![frame_pending.clone().into()]; + let pending_frames = vec![frame_pending.into()]; let mut socket = Socket { written: vec![], closed: false, From 102744dae34dc7c0b71fb1efa1e25883678fc42e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Wed, 23 Oct 2024 17:15:36 +0100 Subject: [PATCH 4/4] Update yamux/src/connection/closing.rs --- yamux/src/connection/closing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yamux/src/connection/closing.rs b/yamux/src/connection/closing.rs index d4f8636..05c92ae 100644 --- a/yamux/src/connection/closing.rs +++ b/yamux/src/connection/closing.rs @@ -130,7 +130,7 @@ mod tests { } fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - uniplemented!() + unimplemented!() } fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> {