Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: don't split header and body across TCP packets #168

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1c06435
Compilation + test using zerocopy ok
stormshield-pj50 Jul 19, 2023
39fa1e5
Fix after comments
stormshield-pj50 Sep 7, 2023
5d4e2d2
cargo fmt
stormshield-pj50 Sep 8, 2023
62002ae
WIP Minimal diff
thomaseizinger Sep 17, 2023
ee00e1d
Some basic fixes
thomaseizinger Sep 17, 2023
801bf4f
Don't use `ready!` macro with `mem::replace`
thomaseizinger Oct 5, 2023
8e11260
Inline variable
thomaseizinger Oct 5, 2023
aeb4cd2
Remove `Init` state
thomaseizinger Oct 5, 2023
f90c990
Use `?` for decoding header
thomaseizinger Oct 5, 2023
93c3834
Use ctor
thomaseizinger Oct 5, 2023
872815e
Use type-system to only allocate for data frames
thomaseizinger Oct 5, 2023
722b7c8
Don't use `cast` outside of `header` module
thomaseizinger Oct 5, 2023
91e812a
Add TODO
thomaseizinger Oct 5, 2023
c095aac
Replace header::decode() with Frame<T>::try_from_header_buffer()
pjalaber Oct 4, 2023
2cab6b4
Cargo fmt
pjalaber Oct 5, 2023
166f8ff
Reduce diff
thomaseizinger Oct 6, 2023
5c6b172
Bring back header::decode
thomaseizinger Oct 6, 2023
524994f
Reduce diff
thomaseizinger Oct 6, 2023
0108a3d
Reduce diff
thomaseizinger Oct 6, 2023
8d32d16
Resolve todo
thomaseizinger Oct 6, 2023
851341e
Reduce diff
thomaseizinger Oct 6, 2023
9b26409
Reduce diff
thomaseizinger Oct 6, 2023
3df462d
Simplify things a bit further
thomaseizinger Oct 6, 2023
09cda48
Use body_len in debug impl
thomaseizinger Oct 6, 2023
5e3e65b
Remove generic length accessor
thomaseizinger Oct 6, 2023
ab29664
Reduce diff
thomaseizinger Oct 6, 2023
b65f2bd
Ensure we check max body len before allocating
thomaseizinger Oct 9, 2023
ee9c920
Don't allocate unless necessary
thomaseizinger Oct 9, 2023
ae3bc3d
WIP: Use `AsyncWrite::poll_write_vectored`
thomaseizinger Oct 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions yamux/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
}
}

fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
fn poll_new_outbound(&mut self, cx: &Context<'_>) -> Poll<Result<Stream>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why diverge from the Future signature here? In other words, why remove the mut?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Triggered by a clippy lint, we can put it back but given that it is an internal API, I wouldn't bother.

if self.streams.len() >= self.config.max_num_streams {
log::error!("{}: maximum number of streams reached", self.id);
return Poll::Ready(Err(ConnectionError::TooManyStreams));
Expand Down Expand Up @@ -722,7 +722,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
shared.update_state(self.id, stream_id, State::RecvClosed);
}
shared.window = shared.window.saturating_sub(frame.body_len());
shared.buffer.push(frame.into_body());
shared.buffer.push(frame.into_body().into());

if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) {
if let Some(credit) = shared.next_window_update() {
Expand Down Expand Up @@ -765,7 +765,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
return Action::Reset(Frame::new(header));
}
shared.window = shared.window.saturating_sub(frame.body_len());
shared.buffer.push(frame.into_body());
shared.buffer.push(frame.into_body().into());
if let Some(w) = shared.reader.take() {
w.wake()
}
Expand Down
2 changes: 1 addition & 1 deletion yamux/src/connection/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Future for Cleanup {
type Output = ConnectionError;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.get_mut();
let this = self.get_mut();

loop {
match this.state {
Expand Down
2 changes: 1 addition & 1 deletion yamux/src/connection/closing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.get_mut();
let this = self.get_mut();

loop {
match this.state {
Expand Down
42 changes: 35 additions & 7 deletions yamux/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ pub mod header;
mod io;

use futures::future::Either;
use header::{Data, GoAway, Header, Ping, StreamId, WindowUpdate};
use header::{Data, GoAway, Header, StreamId, WindowUpdate};
use std::{convert::TryInto, num::TryFromIntError};

pub use io::FrameDecodeError;
pub(crate) use io::Io;
use crate::frame::header::{HEADER_SIZE, Ping};

/// A Yamux message frame consisting of header and body.
#[derive(Clone, Debug, PartialEq, Eq)]
Expand All @@ -32,7 +33,6 @@ impl<T> Frame<T> {
body: Vec::new(),
}
}

pub fn header(&self) -> &Header<T> {
&self.header
}
Expand All @@ -56,6 +56,14 @@ impl<T> Frame<T> {
body: self.body,
}
}

pub(crate) fn body(&self) -> &[u8] {
self.body.as_slice()
}

pub(crate) fn len(&self) -> usize {
self.body.len() + HEADER_SIZE
}
}

impl<A: header::private::Sealed> From<Frame<A>> for Frame<()> {
Expand All @@ -68,6 +76,23 @@ impl<A: header::private::Sealed> From<Frame<A>> for Frame<()> {
}

impl Frame<()> {
pub(crate) fn try_from_header_buffer(
buffer: &[u8; HEADER_SIZE],
max_body_len: usize,
) -> Result<Either<Frame<()>, Frame<Data>>, FrameDecodeError> {
let header = header::decode(buffer)?;

let either = match header.try_into_data() {
Ok(data) if data.body_len() > max_body_len => {
return Err(FrameDecodeError::FrameTooLarge(data.body_len()));
}
Ok(data) => Either::Right(Frame::new(data)),
Err(other) => Either::Left(Frame::new(other)),
};

Ok(either)
}

pub(crate) fn into_data(self) -> Frame<Data> {
Frame {
header: self.header.into_data(),
Expand Down Expand Up @@ -108,14 +133,17 @@ impl Frame<Data> {
Frame::new(header)
}

pub fn body(&self) -> &[u8] {
&self.body
pub fn body_mut(&mut self) -> &mut [u8] {
self.body.as_mut_slice()
}

pub fn body_len(&self) -> u32 {
// Safe cast since we construct `Frame::<Data>`s only with
// `Vec<u8>` of length [0, u32::MAX] in `Frame::data` above.
self.body().len() as u32
let len_in_header = self.header.body_len();
let actual_len = self.body.len();

// debug_assert_eq!(len_in_header, actual_len);

len_in_header as u32
}

pub fn into_body(self) -> Vec<u8> {
Expand Down
25 changes: 19 additions & 6 deletions yamux/src/frame/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,9 @@ impl<T> Header<T> {
self.stream_id
}

pub fn len(&self) -> Len {
self.length
}

#[cfg(test)]
pub fn set_len(&mut self, len: u32) {
self.length = Len(len)
pub fn set_len(&mut self, len: usize) {
self.length = Len(len as u32)
}

/// Arbitrary type cast, use with caution.
Expand Down Expand Up @@ -92,6 +88,14 @@ impl Header<()> {
self.cast()
}

pub(crate) fn try_into_data(self) -> Result<Header<Data>, Self> {
if self.tag == Tag::Data {
return Ok(self.into_data());
}

Err(self)
}

pub(crate) fn into_window_update(self) -> Header<WindowUpdate> {
debug_assert_eq!(self.tag, Tag::WindowUpdate);
self.cast()
Expand All @@ -101,6 +105,7 @@ impl Header<()> {
debug_assert_eq!(self.tag, Tag::Ping);
self.cast()
}

}

impl<T: HasSyn> Header<T> {
Expand Down Expand Up @@ -143,6 +148,14 @@ impl Header<Data> {
_marker: std::marker::PhantomData,
}
}

/// Returns the length of the body.
///
/// The `length` field in the header has a different semantic meaning depending on the tag.
/// For [`Tag::Data`], it describes the length of the body.
pub fn body_len(&self) -> usize {
self.length.val() as usize
}
}

impl Header<WindowUpdate> {
Expand Down
Loading
Loading