Skip to content

Commit

Permalink
feat: auto-tune (dynamic) stream receive window (#176)
Browse files Browse the repository at this point in the history
- Send Yamux' Pings on an interval to measure the connection round-trip-time.
- Dynamically grow the stream receive window based on the round-trip-time and the estimated bandwidth.
  • Loading branch information
mxinden authored Dec 6, 2023
1 parent 16ffe54 commit 68a9e3d
Show file tree
Hide file tree
Showing 15 changed files with 769 additions and 158 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# 0.13.0

- Introduce dynamic stream receive window auto-tuning.
While low-resourced deployments maintain the benefit of small buffers, high resource deployments eventually end-up with a window of roughly the bandwidth-delay-product (ideal) and are thus able to use the entire available bandwidth.
See [PR 176](https://github.com/libp2p/rust-yamux/pull/176) for performance results and details on the implementation.
- Remove `WindowUpdateMode`.
Behavior will always be `WindowUpdateMode::OnRead`, thus enabling flow-control and enforcing backpressure.
See [PR 178](https://github.com/libp2p/rust-yamux/pull/178).
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[workspace]
members = ["yamux", "test-harness"]
members = ["yamux", "test-harness", "quickcheck-ext"]
resolver = "2"
13 changes: 13 additions & 0 deletions quickcheck-ext/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "quickcheck-ext"
version = "0.1.0"
edition = "2021"
publish = false
license = "Unlicense/MIT"

[package.metadata.release]
release = false

[dependencies]
quickcheck = "1"
num-traits = "0.2"
46 changes: 46 additions & 0 deletions quickcheck-ext/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

pub use quickcheck::*;

use core::ops::Range;
use num_traits::sign::Unsigned;

pub trait GenRange {
fn gen_range<T: Unsigned + Arbitrary + Copy>(&mut self, _range: Range<T>) -> T;

fn gen_index(&mut self, ubound: usize) -> usize {
if ubound <= (core::u32::MAX as usize) {
self.gen_range(0..ubound as u32) as usize
} else {
self.gen_range(0..ubound)
}
}
}

impl GenRange for Gen {
fn gen_range<T: Unsigned + Arbitrary + Copy>(&mut self, range: Range<T>) -> T {
<T as Arbitrary>::arbitrary(self) % (range.end - range.start) + range.start
}
}

pub trait SliceRandom {
fn shuffle<T>(&mut self, arr: &mut [T]);
fn choose_multiple<'a, T>(
&mut self,
arr: &'a [T],
amount: usize,
) -> std::iter::Take<std::vec::IntoIter<&'a T>> {
let mut v: Vec<&T> = arr.iter().collect();
self.shuffle(&mut v);
v.into_iter().take(amount)
}
}

impl SliceRandom for Gen {
fn shuffle<T>(&mut self, arr: &mut [T]) {
for i in (1..arr.len()).rev() {
// invariant: elements with index > i have been locked in place.
arr.swap(i, self.gen_index(i + 1));
}
}
}
3 changes: 1 addition & 2 deletions test-harness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ publish = false
[dependencies]
yamux = { path = "../yamux" }
futures = "0.3.4"
quickcheck = "1.0"
quickcheck = { package = "quickcheck-ext", path = "../quickcheck-ext" }
tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros", "time"] }
tokio-util = { version = "0.7", features = ["compat"] }
anyhow = "1"
Expand All @@ -17,7 +17,6 @@ log = "0.4.17"
criterion = "0.5"
env_logger = "0.10"
futures = "0.3.4"
quickcheck = "1.0"
tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros", "time"] }
tokio-util = { version = "0.7", features = ["compat"] }
constrained-connection = "0.1"
Expand Down
16 changes: 14 additions & 2 deletions test-harness/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ where
.try_for_each_concurrent(None, |mut stream| async move {
{
let (mut r, mut w) = AsyncReadExt::split(&mut stream);
futures::io::copy(&mut r, &mut w).await?;
futures::io::copy(&mut r, &mut w).await.unwrap();
}
stream.close().await?;
Ok(())
Expand Down Expand Up @@ -447,9 +447,21 @@ pub struct TestConfig(pub Config);

impl Arbitrary for TestConfig {
fn arbitrary(g: &mut Gen) -> Self {
use quickcheck::GenRange;

let mut c = Config::default();
let max_num_streams = 512;

c.set_read_after_close(Arbitrary::arbitrary(g));
c.set_receive_window(256 * 1024 + u32::arbitrary(g) % (768 * 1024));
c.set_max_num_streams(max_num_streams);
if bool::arbitrary(g) {
c.set_max_connection_receive_window(Some(
g.gen_range(max_num_streams * (yamux::DEFAULT_CREDIT as usize)..usize::MAX),
));
} else {
c.set_max_connection_receive_window(None);
}

TestConfig(c)
}
}
4 changes: 2 additions & 2 deletions test-harness/tests/ack_backlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ where
this.worker_streams.push(ping_pong(stream.unwrap()).boxed());
continue;
}
(Poll::Ready(_), Some(_)) => {
panic!("should not be able to open stream if server hasn't acknowledged existing streams")
(Poll::Ready(e), Some(_)) => {
panic!("should not be able to open stream if server hasn't acknowledged existing streams: {:?}", e)
}
(Poll::Pending, None) => {}
}
Expand Down
4 changes: 2 additions & 2 deletions yamux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ repository = "https://github.com/paritytech/yamux"
edition = "2021"

[dependencies]
futures = { version = "0.3.12", default-features = false, features = ["std"] }
futures = { version = "0.3.12", default-features = false, features = ["std", "executor"] }
log = "0.4.8"
nohash-hasher = "0.2"
parking_lot = "0.12"
Expand All @@ -20,4 +20,4 @@ pin-project = "1.1.0"

[dev-dependencies]
futures = { version = "0.3.12", default-features = false, features = ["executor"] }
quickcheck = "1.0"
quickcheck = { package = "quickcheck-ext", path = "../quickcheck-ext" }
100 changes: 51 additions & 49 deletions yamux/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
mod cleanup;
mod closing;
mod rtt;
mod stream;

use crate::tagged_stream::TaggedStream;
Expand Down Expand Up @@ -287,8 +288,15 @@ struct Active<T> {

pending_frames: VecDeque<Frame<()>>,
new_outbound_stream_waker: Option<Waker>,
}

rtt: rtt::Rtt,

/// A stream's `max_stream_receive_window` can grow beyond [`DEFAULT_CREDIT`], see
/// [`Stream::next_window_update`]. This field is the sum of the bytes by which all streams'
/// `max_stream_receive_window` have each exceeded [`DEFAULT_CREDIT`]. Used to enforce
/// [`Config::max_connection_receive_window`].
accumulated_max_stream_windows: Arc<Mutex<usize>>,
}
/// `Stream` to `Connection` commands.
#[derive(Debug)]
pub(crate) enum StreamCommand {
Expand All @@ -300,15 +308,13 @@ pub(crate) enum StreamCommand {

/// Possible actions as a result of incoming frame handling.
#[derive(Debug)]
enum Action {
pub(crate) enum Action {
/// Nothing to be done.
None,
/// A new stream has been opened by the remote.
New(Stream),
/// A ping should be answered.
Ping(Frame<Ping>),
/// A stream should be reset.
Reset(Frame<Data>),
/// The connection should be terminated.
Terminate(Frame<GoAway>),
}
Expand Down Expand Up @@ -341,7 +347,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
fn new(socket: T, cfg: Config, mode: Mode) -> Self {
let id = Id::random();
log::debug!("new connection: {} ({:?})", id, mode);
let socket = frame::Io::new(id, socket, cfg.max_buffer_size).fuse();
let socket = frame::Io::new(id, socket).fuse();
Active {
id,
mode,
Expand All @@ -356,6 +362,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
},
pending_frames: VecDeque::default(),
new_outbound_stream_waker: None,
rtt: rtt::Rtt::new(),
accumulated_max_stream_windows: Default::default(),
}
}

Expand All @@ -376,6 +384,14 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
loop {
if self.socket.poll_ready_unpin(cx).is_ready() {
// Note `next_ping` does not register a waker and thus if not called regularly (idle
// connection) no ping is sent. This is deliberate as an idle connection does not
// need RTT measurements to increase its stream receive window.
if let Some(frame) = self.rtt.next_ping() {
self.socket.start_send_unpin(frame.into())?;
continue;
}

if let Some(frame) = self.pending_frames.pop_front() {
self.socket.start_send_unpin(frame)?;
continue;
Expand Down Expand Up @@ -439,20 +455,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
log::trace!("{}: creating new outbound stream", self.id);

let id = self.next_stream_id()?;
let extra_credit = self.config.receive_window - DEFAULT_CREDIT;

if extra_credit > 0 {
let mut frame = Frame::window_update(id, extra_credit);
frame.header_mut().syn();
log::trace!("{}/{}: sending initial {}", self.id, id, frame.header());
self.pending_frames.push_back(frame.into());
}

let mut stream = self.make_new_outbound_stream(id, self.config.receive_window);

if extra_credit == 0 {
stream.set_flag(stream::Flag::Syn)
}
let stream = self.make_new_outbound_stream(id);

log::debug!("{}: new outbound {} of {}", self.id, stream, self);
self.streams.insert(id, stream.clone_shared());
Expand Down Expand Up @@ -537,7 +540,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
fn on_frame(&mut self, frame: Frame<()>) -> Result<Option<Stream>> {
log::trace!("{}: received: {}", self.id, frame.header());

if frame.header().flags().contains(header::ACK) {
if frame.header().flags().contains(header::ACK)
&& matches!(frame.header().tag(), Tag::Data | Tag::WindowUpdate)
{
let id = frame.header().stream_id();
if let Some(stream) = self.streams.get(&id) {
stream
Expand Down Expand Up @@ -565,10 +570,6 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
log::trace!("{}/{}: pong", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
}
Action::Reset(f) => {
log::trace!("{}/{}: sending reset", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
}
Action::Terminate(f) => {
log::trace!("{}: sending term", self.id);
self.pending_frames.push_back(f.into());
Expand Down Expand Up @@ -620,23 +621,22 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
log::error!("{}: maximum number of streams reached", self.id);
return Action::Terminate(Frame::internal_error());
}
let mut stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
let stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
{
let mut shared = stream.shared();
if is_finish {
shared.update_state(self.id, stream_id, State::RecvClosed);
}
shared.window = shared.window.saturating_sub(frame.body_len());
shared.consume_receive_window(frame.body_len());
shared.buffer.push(frame.into_body());
}
stream.set_flag(stream::Flag::Ack);
self.streams.insert(stream_id, stream.clone_shared());
return Action::New(stream);
}

if let Some(s) = self.streams.get_mut(&stream_id) {
let mut shared = s.lock();
if frame.body().len() > shared.window as usize {
if frame.body_len() > shared.receive_window() {
log::error!(
"{}/{}: frame body larger than window of stream",
self.id,
Expand All @@ -647,18 +647,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
if is_finish {
shared.update_state(self.id, stream_id, State::RecvClosed);
}
let max_buffer_size = self.config.max_buffer_size;
if shared.buffer.len() >= max_buffer_size {
log::error!(
"{}/{}: buffer of stream grows beyond limit",
self.id,
stream_id
);
let mut header = Header::data(stream_id, 0);
header.rst();
return Action::Reset(Frame::new(header));
}
shared.window = shared.window.saturating_sub(frame.body_len());
shared.consume_receive_window(frame.body_len());
shared.buffer.push(frame.into_body());
if let Some(w) = shared.reader.take() {
w.wake()
Expand Down Expand Up @@ -718,8 +707,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
}

let credit = frame.header().credit() + DEFAULT_CREDIT;
let mut stream = self.make_new_inbound_stream(stream_id, credit);
stream.set_flag(stream::Flag::Ack);
let stream = self.make_new_inbound_stream(stream_id, credit);

if is_finish {
stream
Expand All @@ -732,7 +720,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {

if let Some(s) = self.streams.get_mut(&stream_id) {
let mut shared = s.lock();
shared.credit += frame.header().credit();
shared.increase_send_window_by(frame.header().credit());
if is_finish {
shared.update_state(self.id, stream_id, State::RecvClosed);
}
Expand Down Expand Up @@ -761,15 +749,14 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
fn on_ping(&mut self, frame: &Frame<Ping>) -> Action {
let stream_id = frame.header().stream_id();
if frame.header().flags().contains(header::ACK) {
// pong
return Action::None;
return self.rtt.handle_pong(frame.nonce());
}
if stream_id == CONNECTION_ID || self.streams.contains_key(&stream_id) {
let mut hdr = Header::ping(frame.header().nonce());
hdr.ack();
return Action::Ping(Frame::new(hdr));
}
log::trace!(
log::debug!(
"{}/{}: ping for unknown stream, possibly dropped earlier: {:?}",
self.id,
stream_id,
Expand All @@ -794,10 +781,18 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
waker.wake();
}

Stream::new_inbound(id, self.id, config, credit, sender)
Stream::new_inbound(
id,
self.id,
config,
credit,
sender,
self.rtt.clone(),
self.accumulated_max_stream_windows.clone(),
)
}

fn make_new_outbound_stream(&mut self, id: StreamId, window: u32) -> Stream {
fn make_new_outbound_stream(&mut self, id: StreamId) -> Stream {
let config = self.config.clone();

let (sender, receiver) = mpsc::channel(10); // 10 is an arbitrary number.
Expand All @@ -806,7 +801,14 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
waker.wake();
}

Stream::new_outbound(id, self.id, config, window, sender)
Stream::new_outbound(
id,
self.id,
config,
sender,
self.rtt.clone(),
self.accumulated_max_stream_windows.clone(),
)
}

fn next_stream_id(&mut self) -> Result<StreamId> {
Expand Down
Loading

0 comments on commit 68a9e3d

Please sign in to comment.