Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: don't split header and body across TCP packets #168

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1c06435
Compilation + test using zerocopy ok
stormshield-pj50 Jul 19, 2023
39fa1e5
Fix after comments
stormshield-pj50 Sep 7, 2023
5d4e2d2
cargo fmt
stormshield-pj50 Sep 8, 2023
62002ae
WIP Minimal diff
thomaseizinger Sep 17, 2023
ee00e1d
Some basic fixes
thomaseizinger Sep 17, 2023
801bf4f
Don't use `ready!` macro with `mem::replace`
thomaseizinger Oct 5, 2023
8e11260
Inline variable
thomaseizinger Oct 5, 2023
aeb4cd2
Remove `Init` state
thomaseizinger Oct 5, 2023
f90c990
Use `?` for decoding header
thomaseizinger Oct 5, 2023
93c3834
Use ctor
thomaseizinger Oct 5, 2023
872815e
Use type-system to only allocate for data frames
thomaseizinger Oct 5, 2023
722b7c8
Don't use `cast` outside of `header` module
thomaseizinger Oct 5, 2023
91e812a
Add TODO
thomaseizinger Oct 5, 2023
c095aac
Replace header::decode() with Frame<T>::try_from_header_buffer()
pjalaber Oct 4, 2023
2cab6b4
Cargo fmt
pjalaber Oct 5, 2023
166f8ff
Reduce diff
thomaseizinger Oct 6, 2023
5c6b172
Bring back header::decode
thomaseizinger Oct 6, 2023
524994f
Reduce diff
thomaseizinger Oct 6, 2023
0108a3d
Reduce diff
thomaseizinger Oct 6, 2023
8d32d16
Resolve todo
thomaseizinger Oct 6, 2023
851341e
Reduce diff
thomaseizinger Oct 6, 2023
9b26409
Reduce diff
thomaseizinger Oct 6, 2023
3df462d
Simplify things a bit further
thomaseizinger Oct 6, 2023
09cda48
Use body_len in debug impl
thomaseizinger Oct 6, 2023
5e3e65b
Remove generic length accessor
thomaseizinger Oct 6, 2023
ab29664
Reduce diff
thomaseizinger Oct 6, 2023
b65f2bd
Ensure we check max body len before allocating
thomaseizinger Oct 9, 2023
ee9c920
Don't allocate unless necessary
thomaseizinger Oct 9, 2023
ae3bc3d
WIP: Use `AsyncWrite::poll_write_vectored`
thomaseizinger Oct 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions yamux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ parking_lot = "0.12"
rand = "0.8.3"
static_assertions = "1"
pin-project = "1.1.0"
zerocopy = { version = "0.7.0", features = ["derive"] }

[dev-dependencies]
quickcheck = "1.0"
15 changes: 9 additions & 6 deletions yamux/src/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ impl Chunks {
}

/// Add another chunk of bytes to the end.
pub(crate) fn push(&mut self, x: Vec<u8>) {
self.len += x.len();
if !x.is_empty() {
self.seq.push_back(Chunk {
cursor: io::Cursor::new(x),
})
pub(crate) fn push(&mut self, x: Vec<u8>, offset: usize) {
let x_len = x.len();
let cursor = io::Cursor::new(x);
let mut chunk = Chunk { cursor };
chunk.advance(offset);
if !chunk.is_empty() {
assert_eq!(chunk.len(), x_len - offset);
self.len += chunk.len() + offset;
self.seq.push_back(chunk);
}
}

Expand Down
129 changes: 75 additions & 54 deletions yamux/src/connection.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion yamux/src/connection/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Future for Cleanup {
type Output = ConnectionError;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.get_mut();
let this = self.get_mut();

loop {
match this.state {
Expand Down
2 changes: 1 addition & 1 deletion yamux/src/connection/closing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.get_mut();
let this = self.get_mut();

loop {
match this.state {
Expand Down
28 changes: 16 additions & 12 deletions yamux/src/connection/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
// at https://opensource.org/licenses/MIT.

use crate::frame::header::ACK;
use crate::frame::header::{Flags, ACK, HEADER_SIZE};
use crate::{
chunks::Chunks,
connection::{self, StreamCommand},
frame::{
header::{Data, Header, StreamId, WindowUpdate},
header::{Data, StreamId, WindowUpdate},
Frame,
},
Config, WindowUpdateMode, DEFAULT_CREDIT,
Expand Down Expand Up @@ -183,18 +183,21 @@ impl Stream {
}

/// Set ACK or SYN flag if necessary.
fn add_flag(&mut self, header: &mut Header<Either<Data, WindowUpdate>>) {
fn add_flag(&mut self, frame: &mut Frame<Either<Data, WindowUpdate>>) -> Flags {
let mut parsed_frame = frame.parse_mut().expect("valid frame");
let header = parsed_frame.header_mut();
match self.flag {
Flag::None => (),
Flag::None => {}
Flag::Syn => {
header.syn();
self.flag = Flag::None
self.flag = Flag::None;
}
Flag::Ack => {
header.ack();
self.flag = Flag::None
self.flag = Flag::None;
}
}
header.flags()
}

/// Send new credit to the sending side via a window update message if
Expand All @@ -218,7 +221,7 @@ impl Stream {
drop(shared);

let mut frame = Frame::window_update(self.id, credit).right();
self.add_flag(frame.header_mut());
self.add_flag(&mut frame);
let cmd = StreamCommand::SendFrame(frame);
self.sender
.start_send(cmd)
Expand Down Expand Up @@ -257,7 +260,8 @@ impl futures::stream::Stream for Stream {
let mut shared = self.shared();

if let Some(bytes) = shared.buffer.pop() {
let off = bytes.offset();
// Every chunk starts with a frame header, so we add HEADER_SIZE to offset.
let off = bytes.offset() + HEADER_SIZE;
let mut vec = bytes.into_vec();
if off != 0 {
// This should generally not happen when the stream is used only as
Expand All @@ -269,7 +273,7 @@ impl futures::stream::Stream for Stream {
self.conn,
self.id
);
vec = vec.split_off(off)
vec = vec.split_off(off);
}
return Poll::Ready(Some(Ok(Packet(vec))));
}
Expand Down Expand Up @@ -367,18 +371,18 @@ impl AsyncWrite for Stream {
let k = std::cmp::min(shared.credit as usize, buf.len());
let k = std::cmp::min(k, self.config.split_send_size);
shared.credit = shared.credit.saturating_sub(k as u32);
Vec::from(&buf[..k])
&buf[..k]
};
let n = body.len();
let mut frame = Frame::data(self.id, body).expect("body <= u32::MAX").left();
self.add_flag(frame.header_mut());
let flags = self.add_flag(&mut frame);
log::trace!("{}/{}: write {} bytes", self.conn, self.id, n);

// technically, the frame hasn't been sent yet on the wire but from the perspective of this data structure, we've queued the frame for sending
// We are tracking this information:
// a) to be consistent with outbound streams
// b) to correctly test our behaviour around timing of when ACKs are sent. See `ack_timing.rs` test.
if frame.header().flags().contains(ACK) {
if flags.contains(ACK) {
self.shared()
.update_state(self.conn, self.id, State::Open { acknowledged: true });
}
Expand Down
Loading