Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove WindowUpdateMode::OnReceive #179

Merged
merged 5 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 0.13.0

- Remove `WindowUpdateMode`.
Behavior will always be `WindowUpdateMode::OnRead`, thus enabling flow-control and enforcing backpressure.
See [PR 178](https://github.com/libp2p/rust-yamux/pull/178).

# 0.12.0

- Remove `Control` and `ControlledConnection`.
Expand Down
10 changes: 2 additions & 8 deletions test-harness/benches/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,15 @@ fn concurrent(c: &mut Criterion) {
group.finish();
}

fn config() -> Config {
let mut c = Config::default();
c.set_window_update_mode(yamux::WindowUpdateMode::OnRead);
c
}

async fn oneway(
nstreams: usize,
nmessages: usize,
data: Bytes,
server: Endpoint,
client: Endpoint,
) {
let server = Connection::new(server, config(), Mode::Server);
let client = Connection::new(client, config(), Mode::Client);
let server = Connection::new(server, Config::default(), Mode::Server);
let client = Connection::new(client, Config::default(), Mode::Client);

task::spawn(dev_null_server(server));

Expand Down
7 changes: 1 addition & 6 deletions test-harness/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use std::{fmt, io, mem};
use tokio::net::{TcpListener, TcpSocket, TcpStream};
use tokio::task;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use yamux::Config;
use yamux::ConnectionError;
use yamux::{Config, WindowUpdateMode};
use yamux::{Connection, Mode};

pub async fn connected_peers(
Expand Down Expand Up @@ -448,11 +448,6 @@ pub struct TestConfig(pub Config);
impl Arbitrary for TestConfig {
fn arbitrary(g: &mut Gen) -> Self {
let mut c = Config::default();
c.set_window_update_mode(if bool::arbitrary(g) {
WindowUpdateMode::OnRead
} else {
WindowUpdateMode::OnReceive
});
c.set_read_after_close(Arbitrary::arbitrary(g));
c.set_receive_window(256 * 1024 + u32::arbitrary(g) % (768 * 1024));
TestConfig(c)
Expand Down
3 changes: 2 additions & 1 deletion yamux/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yamux"
version = "0.12.0"
version = "0.13.0"
authors = ["Parity Technologies <[email protected]>"]
license = "Apache-2.0 OR MIT"
description = "Multiplexer over reliable, ordered connections"
Expand All @@ -19,4 +19,5 @@ static_assertions = "1"
pin-project = "1.1.0"

[dev-dependencies]
futures = { version = "0.3.12", default-features = false, features = ["executor"] }
quickcheck = "1.0"
65 changes: 13 additions & 52 deletions yamux/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
error::ConnectionError,
frame::header::{self, Data, GoAway, Header, Ping, StreamId, Tag, WindowUpdate, CONNECTION_ID},
frame::{self, Frame},
Config, WindowUpdateMode, DEFAULT_CREDIT,
Config, DEFAULT_CREDIT,
};
use crate::{Result, MAX_ACK_BACKLOG};
use cleanup::Cleanup;
Expand Down Expand Up @@ -304,9 +304,7 @@ enum Action {
/// Nothing to be done.
None,
/// A new stream has been opened by the remote.
New(Stream, Option<Frame<WindowUpdate>>),
/// A window update should be sent to the remote.
Update(Frame<WindowUpdate>),
New(Stream),
/// A ping should be answered.
Ping(Frame<Ping>),
/// A stream should be reset.
Expand Down Expand Up @@ -504,23 +502,13 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
// The remote may be out of credit though and blocked on
// writing more data. We may need to reset the stream.
State::SendClosed => {
if self.config.window_update_mode == WindowUpdateMode::OnRead
&& shared.window == 0
{
// The remote may be waiting for a window update
// which we will never send, so reset the stream now.
let mut header = Header::data(stream_id, 0);
header.rst();
Some(Frame::new(header))
} else {
// The remote has either still credit or will be given more
// (due to an enqueued window update or because the update
// mode is `OnReceive`) or we already have inbound frames in
// the socket buffer which will be processed later. In any
// case we will reply with an RST in `Connection::on_data`
// because the stream will no longer be known.
None
}
// The remote has either still credit or will be given more
// (due to an enqueued window update or because the update
// mode is `OnReceive`) or we already have inbound frames in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale docs?

// the socket buffer which will be processed later. In any
// case we will reply with an RST in `Connection::on_data`
// because the stream will no longer be known.
None
}
// The stream was properly closed. We already have sent our FIN frame. The
// remote end has already done so in the past.
Expand Down Expand Up @@ -569,18 +557,10 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
};
match action {
Action::None => {}
Action::New(stream, update) => {
Action::New(stream) => {
log::trace!("{}: new inbound {} of {}", self.id, stream, self);
if let Some(f) = update {
log::trace!("{}/{}: sending update", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
}
return Ok(Some(stream));
}
Action::Update(f) => {
log::trace!("{}: sending update: {:?}", self.id, f.header());
self.pending_frames.push_back(f.into());
}
Action::Ping(f) => {
log::trace!("{}/{}: pong", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
Expand Down Expand Up @@ -641,29 +621,17 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
return Action::Terminate(Frame::internal_error());
}
let mut stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
let mut window_update = None;
{
let mut shared = stream.shared();
if is_finish {
shared.update_state(self.id, stream_id, State::RecvClosed);
}
shared.window = shared.window.saturating_sub(frame.body_len());
shared.buffer.push(frame.into_body());

if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) {
if let Some(credit) = shared.next_window_update() {
shared.window += credit;
let mut frame = Frame::window_update(stream_id, credit);
frame.header_mut().ack();
window_update = Some(frame)
}
}
}
if window_update.is_none() {
stream.set_flag(stream::Flag::Ack)
}
stream.set_flag(stream::Flag::Ack);
self.streams.insert(stream_id, stream.clone_shared());
return Action::New(stream, window_update);
return Action::New(stream);
}

if let Some(s) = self.streams.get_mut(&stream_id) {
Expand Down Expand Up @@ -695,13 +663,6 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
if let Some(w) = shared.reader.take() {
w.wake()
}
if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) {
if let Some(credit) = shared.next_window_update() {
shared.window += credit;
let frame = Frame::window_update(stream_id, credit);
return Action::Update(frame);
}
}
} else {
log::trace!(
"{}/{}: data frame for unknown stream, possibly dropped earlier: {:?}",
Expand Down Expand Up @@ -766,7 +727,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
.update_state(self.id, stream_id, State::RecvClosed);
}
self.streams.insert(stream_id, stream.clone_shared());
return Action::New(stream, None);
return Action::New(stream);
}

if let Some(s) = self.streams.get_mut(&stream_id) {
Expand Down
26 changes: 7 additions & 19 deletions yamux/src/connection/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
header::{Data, Header, StreamId, WindowUpdate},
Frame,
},
Config, WindowUpdateMode, DEFAULT_CREDIT,
Config, DEFAULT_CREDIT,
};
use futures::{
channel::mpsc,
Expand Down Expand Up @@ -200,12 +200,6 @@ impl Stream {
/// Send new credit to the sending side via a window update message if
/// permitted.
fn send_window_update(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
// When using [`WindowUpdateMode::OnReceive`] window update messages are
// send early on data receival (see [`crate::Connection::on_frame`]).
if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) {
return Poll::Ready(Ok(()));
}

let mut shared = self.shared.lock();

if let Some(credit) = shared.next_window_update() {
Expand Down Expand Up @@ -487,6 +481,7 @@ impl Shared {
current // Return the previous stream state for informational purposes.
}

// TODO: This does not need to live in shared any longer.
/// Calculate the number of additional window bytes the receiving side
/// should grant the sending side via a window update message.
///
Expand All @@ -499,19 +494,12 @@ impl Shared {
return None;
}

let new_credit = match self.config.window_update_mode {
WindowUpdateMode::OnReceive => {
debug_assert!(self.config.receive_window >= self.window);
let new_credit = {
debug_assert!(self.config.receive_window >= self.window);
let bytes_received = self.config.receive_window.saturating_sub(self.window);
let buffer_len: u32 = self.buffer.len().try_into().unwrap_or(std::u32::MAX);

self.config.receive_window.saturating_sub(self.window)
}
WindowUpdateMode::OnRead => {
debug_assert!(self.config.receive_window >= self.window);
let bytes_received = self.config.receive_window.saturating_sub(self.window);
let buffer_len: u32 = self.buffer.len().try_into().unwrap_or(std::u32::MAX);

bytes_received.saturating_sub(buffer_len)
}
bytes_received.saturating_sub(buffer_len)
};

// Send WindowUpdate message when half or more of the configured receive
Expand Down
34 changes: 0 additions & 34 deletions yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,32 +63,6 @@ const MAX_ACK_BACKLOG: usize = 256;
/// https://github.com/paritytech/yamux/issues/100.
const DEFAULT_SPLIT_SEND_SIZE: usize = 16 * 1024;

/// Specifies when window update frames are sent.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum WindowUpdateMode {
/// Send window updates as soon as a [`Stream`]'s receive window drops to 0.
///
/// This ensures that the sender can resume sending more data as soon as possible
/// but a slow reader on the receiving side may be overwhelmed, i.e. it accumulates
/// data in its buffer which may reach its limit (see `set_max_buffer_size`).
/// In this mode, window updates merely prevent head of line blocking but do not
/// effectively exercise back pressure on senders.
OnReceive,

/// Send window updates only when data is read on the receiving end.
///
/// This ensures that senders do not overwhelm receivers and keeps buffer usage
/// low. However, depending on the protocol, there is a risk of deadlock, namely
/// if both endpoints want to send data larger than the receivers window and they
/// do not read before finishing their writes. Use this mode only if you are sure
/// that this will never happen, i.e. if
///
/// - Endpoints *A* and *B* never write at the same time, *or*
/// - Endpoints *A* and *B* write at most *n* frames concurrently such that the sum
/// of the frame lengths is less or equal to the available credit of *A* and *B*
/// respectively.
OnRead,
}

/// Yamux configuration.
///
Expand All @@ -105,7 +79,6 @@ pub struct Config {
receive_window: u32,
max_buffer_size: usize,
max_num_streams: usize,
window_update_mode: WindowUpdateMode,
read_after_close: bool,
split_send_size: usize,
}
Expand All @@ -116,7 +89,6 @@ impl Default for Config {
receive_window: DEFAULT_CREDIT,
max_buffer_size: 1024 * 1024,
max_num_streams: 8192,
window_update_mode: WindowUpdateMode::OnRead,
read_after_close: true,
split_send_size: DEFAULT_SPLIT_SEND_SIZE,
}
Expand Down Expand Up @@ -147,12 +119,6 @@ impl Config {
self
}

/// Set the window update mode to use.
pub fn set_window_update_mode(&mut self, m: WindowUpdateMode) -> &mut Self {
self.window_update_mode = m;
self
}

/// Allow or disallow streams to read from buffered data after
/// the connection has been closed.
pub fn set_read_after_close(&mut self, b: bool) -> &mut Self {
Expand Down
Loading