From aa4ce72fe451fd8ef9faabc13e29e3e2e78a949e Mon Sep 17 00:00:00 2001 From: dr7ana Date: Mon, 6 May 2024 07:20:18 -0700 Subject: [PATCH 1/2] Stream buffer watermarking - Applications can call `Stream::set_watermark(...)` to implement logic to be executed at states dictated by the number of bytes currently unsent. - Application must pass `low` and `high` watermark amounts; an `execute-on-low` callback can be passed with or without an `execute-on-high` callback (and vice versa) - The `execute-on-low` callback will not be executed until the buffer state rises above the `high` value; it will not be executed again until the buffer state rises once more above the `high` value - The `execute-on-high` callback will not be executed until the buffer state drops below the `low` value; it will not be executed again until the buffer state drops once more below the `low` value - Callbacks can be passed with an optional boolean in their opt:: wrapper, indicating "clear after execution"; this will ensure the callback is only executed ONCE before being cleared. The default behavior is repeated callback execution - Invoking this function repeatedly will overwrite any currently set thresholds and callbacks --- include/oxen/quic/opt.hpp | 31 +++++++++++ include/oxen/quic/stream.hpp | 35 ++++++++++++ src/connection.cpp | 3 + src/stream.cpp | 100 ++++++++++++++++++++++++++++++++- tests/012-watermarks.cpp | 104 +++++++++++++++++++++++++++++++++++ tests/CMakeLists.txt | 1 + 6 files changed, 273 insertions(+), 1 deletion(-) create mode 100644 tests/012-watermarks.cpp diff --git a/include/oxen/quic/opt.hpp b/include/oxen/quic/opt.hpp index 46ab92fe..fd80cc1a 100644 --- a/include/oxen/quic/opt.hpp +++ b/include/oxen/quic/opt.hpp @@ -9,6 +9,7 @@ namespace oxen::quic { class Endpoint; + class Stream; namespace opt { @@ -171,5 +172,35 @@ namespace oxen::quic explicit operator bool() const { return send_hook != nullptr; } }; + + // Used to provide callbacks for stream buffer watermarking. Application can pass an optional second parameter to + // indicate that the logic should be executed once before the callback is cleared. The default behavior is for the + // callback to persist and execute repeatedly + struct watermark + { + using buffer_hook_t = std::function; + + private: + buffer_hook_t _hook = nullptr; + bool _persist = true; + + public: + watermark() = default; + explicit watermark(buffer_hook_t hook, bool persist = true) : _hook{std::move(hook)}, _persist{persist} {} + + bool persist() const { return _persist; } + + void clear() { _hook = nullptr; } + + explicit operator bool() const { return _hook != nullptr; } + + void operator()(Stream& s) + { + _hook(s); + + if (not _persist) + _hook = nullptr; + } + }; } // namespace opt } // namespace oxen::quic diff --git a/include/oxen/quic/stream.hpp b/include/oxen/quic/stream.hpp index ae601596..7b25bc8e 100644 --- a/include/oxen/quic/stream.hpp +++ b/include/oxen/quic/stream.hpp @@ -13,6 +13,7 @@ #include "connection_ids.hpp" #include "error.hpp" #include "iochannel.hpp" +#include "opt.hpp" #include "types.hpp" #include "utils.hpp" @@ -56,6 +57,29 @@ namespace oxen::quic const ConnectionID reference_id; + /** Buffer Watermarking: + - Applications can call `::set_watermark(...)` to implement logic to be executed at states dictated by the number + of bytes currently unsent. + - Application must pass `low` and `high` watermark amounts; an execute-on-low callback can be passed with or + without an execute-on-high callback (and vice versa) + - The execute-on-low callback will not be executed until the buffer state rises above the `high` value; it + will not be executed again until the buffer state rises once more above the `high` value + - The execute-on-high callback will not be executed until the buffer state drops below the `low` value; it + will not be executed again until the buffer state drops once more below the `low` value + - Callbacks can be passed with an optional boolean in their opt:: wrapper, indicating "clear after execution"; + this will ensure the callback is only executed ONCE before being cleared. The default behavior is repeated + callback execution + - Invoking this function repeatedly will overwrite any currently set thresholds and callbacks + */ + void set_watermark( + size_t low, size_t high, std::optional low_hook, std::optional high_hook); + + // Clears any currently set watermarks on this stream object + void clear_watermarks(); + + // Do not call this function from within a watermark callback! + bool has_watermarks() const; + // These public methods are synchronized so that they can be safely called from outside the // libquic main loop thread. bool available() const; @@ -111,6 +135,17 @@ namespace oxen::quic bool _ready{false}; int64_t _stream_id; + bool _be_water{false}; + + size_t _high_mark{0}; + size_t _low_mark{0}; + + bool _high_primed{false}; + bool _low_primed{true}; + + opt::watermark _high_water; + opt::watermark _low_water; + void wrote(size_t bytes) override; void append_buffer(bstring_view buffer, std::shared_ptr keep_alive); diff --git a/src/connection.cpp b/src/connection.cpp index e5bb87ca..351b64d1 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -1161,6 +1161,9 @@ namespace oxen::quic const bool was_closing = stream._is_closing; stream._is_closing = stream._is_shutdown = true; + if (stream._be_water) + stream.clear_watermarks(); + if (!was_closing) { log::trace(log_cat, "Invoking stream close callback"); diff --git a/src/stream.cpp b/src/stream.cpp index 4e43d5a9..cfddc157 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -42,6 +42,58 @@ namespace oxen::quic log::trace(log_cat, "Destroying stream {}", _stream_id); } + void Stream::set_watermark( + size_t low, size_t high, std::optional low_cb, std::optional high_cb) + { + if (not low_cb and not high_cb) + throw std::invalid_argument{"Must pass at least one callback in call to ::set_watermark()!"}; + + endpoint.call_soon([this, low, high, low_hook = std::move(low_cb), high_hook = std::move(high_cb)]() { + if (_is_closing || _is_shutdown || _sent_fin) + { + log::warning(log_cat, "Failed to set watermarks; stream is not active!"); + return; + } + + _low_mark = low; + _high_mark = high; + + if (low_hook.has_value()) + _low_water = std::move(*low_hook); + else + _low_water.clear(); + + if (high_hook.has_value()) + _high_water = std::move(*high_hook); + else + _high_water.clear(); + + _be_water = true; + + log::info(log_cat, "Stream set watermarks!"); + }); + } + + void Stream::clear_watermarks() + { + endpoint.call_soon([this]() { + if (not _be_water and not _low_water and not _high_water) + { + log::warning(log_cat, "Failed to clear watermarks; stream has none set!"); + return; + } + + _low_mark = 0; + _high_mark = 0; + if (_low_water) + _low_water.clear(); + if (_high_water) + _high_water.clear(); + _be_water = false; + log::info(log_cat, "Stream cleared currently set watermarks!"); + }); + } + bool Stream::available() const { return endpoint.call_get([this] { return !(_is_closing || _is_shutdown || _sent_fin); }); @@ -52,6 +104,11 @@ namespace oxen::quic return endpoint.call_get([this] { return _ready; }); } + bool Stream::has_watermarks() const + { + return endpoint.call_get([this]() { return _be_water and _low_water and _high_water; }); + } + std::shared_ptr Stream::get_stream() { return shared_from_this(); @@ -143,7 +200,48 @@ namespace oxen::quic if (bytes) user_buffers.front().first.remove_prefix(bytes); - log::trace(log_cat, "{} bytes acked, {} unacked remaining", bytes, size()); + auto sz = size(); + + // Do not bother with this block of logic if no watermarks are set + if (_be_water) + { + auto unsent = sz - _unacked_size; + + // We are above the high watermark. We prime the low water hook to be fired the next time we drop below the low + // watermark. If the high water hook exists and is primed, execute it + if (unsent >= _high_mark) + { + _low_primed = true; + log::info(log_cat, "Low water hook primed!"); + + if (_high_water and _high_primed) + { + log::info(log_cat, "Executing high watermark hook!"); + _high_primed = false; + return _high_water(*this); + } + } + // We are below the low watermark. We prime the high water hook to be fired the next time we rise above the high + // watermark. If the low water hook exists and is primed, execute it + else if (unsent <= _low_mark) + { + _high_primed = true; + log::info(log_cat, "High water hook primed!"); + + if (_low_water and _low_primed) + { + log::info(log_cat, "Executing low watermark hook!"); + _low_primed = false; + return _low_water(*this); + } + } + + // Low/high watermarks were executed and self-cleared, so clean up + if (not _high_water and not _low_water) + return clear_watermarks(); + } + + log::trace(log_cat, "{} bytes acked, {} unacked remaining", bytes, sz); } void Stream::wrote(size_t bytes) diff --git a/tests/012-watermarks.cpp b/tests/012-watermarks.cpp new file mode 100644 index 00000000..6571d3c8 --- /dev/null +++ b/tests/012-watermarks.cpp @@ -0,0 +1,104 @@ +#include +#include +#include +#include + +#include "utils.hpp" + +namespace oxen::quic::test +{ + TEST_CASE("012 - Stream Buffer Watermarking", "[012][watermark][streams]") + { + Network test_net{}; + bstring req_msg(100'000, std::byte{'a'}); + + std::promise d_promise; + std::future d_future = d_promise.get_future(); + + auto client_established = callback_waiter{[](connection_interface&) {}}; + auto server_established = callback_waiter{[](connection_interface&) {}}; + + auto [client_tls, server_tls] = defaults::tls_creds_from_ed_keys(); + + Address server_local{}; + Address client_local{}; + + auto server_endpoint = test_net.endpoint(server_local, server_established); + REQUIRE_NOTHROW(server_endpoint->listen(server_tls)); + + RemoteAddress client_remote{defaults::SERVER_PUBKEY, "127.0.0.1"s, server_endpoint->local().port()}; + + auto client_endpoint = test_net.endpoint(client_local, client_established); + auto conn_interface = client_endpoint->connect(client_remote, client_tls); + + CHECK(client_established.wait()); + CHECK(server_established.wait()); + + auto client_stream = conn_interface->open_stream(); + + CHECK_FALSE(client_stream->has_watermarks()); + + SECTION("Watermarks self-clear") + { + auto low_water = callback_waiter{[](Stream&) {}}; + auto high_water = callback_waiter{[](Stream&) {}}; + client_stream->set_watermark(500, 1000, opt::watermark{low_water, false}, opt::watermark{high_water, false}); + + CHECK(client_stream->has_watermarks()); + + REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); + + CHECK(low_water.wait()); + + REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); + REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); + + CHECK(high_water.wait()); + + REQUIRE_FALSE(client_stream->has_watermarks()); + } + + SECTION("Watermarks persist") + { + std::atomic low_count{0}, high_count{0}; + + client_stream->set_watermark( + 500, + 2000, + opt::watermark{ + [&](const Stream&) { + log::debug(log_cat, "Executing low hook!"); + low_count += 1; + }, + true}, + opt::watermark{ + [&](const Stream&) { + log::debug(log_cat, "Executing high hook!"); + high_count += 1; + }, + true}); + + REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); + + std::this_thread::sleep_for(100ms); + + REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); + REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); + + std::this_thread::sleep_for(150ms); + + REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); + REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); + + std::this_thread::sleep_for(150ms); + + REQUIRE(low_count >= 2); + REQUIRE(high_count > 1); + REQUIRE(low_count >= high_count); + + client_stream->clear_watermarks(); + + REQUIRE_FALSE(client_stream->has_watermarks()); + } + } +} // namespace oxen::quic::test diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 153c805f..d8664158 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -38,6 +38,7 @@ if(LIBQUIC_BUILD_TESTS) 009-alpns.cpp 010-migration.cpp 011-manual_transmission.cpp + 012-watermarks.cpp main.cpp case_logger.cpp From 5afdf75cd846a8ccc077cf354db088a9ec142c75 Mon Sep 17 00:00:00 2001 From: dr7ana Date: Mon, 6 May 2024 10:13:24 -0700 Subject: [PATCH 2/2] Stream pausing - Applications can call `Stream::pause()` to stop acking and extending the max stream data offset. This has the effect of limiting the inflow by signalling to the sender that they should pause. - This is reverted by invoking `Stream::resume()` - 012 test section added --- include/oxen/quic/stream.hpp | 16 ++++++++++- src/connection.cpp | 7 +++-- src/stream.cpp | 48 +++++++++++++++++++++++++++---- tests/012-watermarks.cpp | 56 ++++++++++++++++++++++++++++++++---- 4 files changed, 113 insertions(+), 14 deletions(-) diff --git a/include/oxen/quic/stream.hpp b/include/oxen/quic/stream.hpp index 7b25bc8e..4519ef4f 100644 --- a/include/oxen/quic/stream.hpp +++ b/include/oxen/quic/stream.hpp @@ -80,6 +80,17 @@ namespace oxen::quic // Do not call this function from within a watermark callback! bool has_watermarks() const; + /** Stream Pause: + - Applications can call `::pause()` to stop extending the max stream data offset. This has the effect of limiting + the inflow by signalling to the sender that they should pause + - This is reverted by invoking `::resume()` + */ + void pause(); + + void resume(); + + bool is_paused() const; + // These public methods are synchronized so that they can be safely called from outside the // libquic main loop thread. bool available() const; @@ -133,9 +144,12 @@ namespace oxen::quic bool _is_shutdown{false}; bool _sent_fin{false}; bool _ready{false}; + bool _paused{false}; int64_t _stream_id; - bool _be_water{false}; + size_t _paused_offset{0}; + + bool _is_watermarked{false}; size_t _high_mark{0}; size_t _low_mark{0}; diff --git a/src/connection.cpp b/src/connection.cpp index 351b64d1..9ec57d3b 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -1161,7 +1161,7 @@ namespace oxen::quic const bool was_closing = stream._is_closing; stream._is_closing = stream._is_shutdown = true; - if (stream._be_water) + if (stream._is_watermarked) stream.clear_watermarks(); if (!was_closing) @@ -1305,7 +1305,10 @@ namespace oxen::quic } else { - ngtcp2_conn_extend_max_stream_offset(conn.get(), id, data.size()); + if (str->_paused) + str->_paused_offset += data.size(); + else + ngtcp2_conn_extend_max_stream_offset(conn.get(), id, data.size()); ngtcp2_conn_extend_max_offset(conn.get(), data.size()); } diff --git a/src/stream.cpp b/src/stream.cpp index cfddc157..6c5e1580 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -68,7 +68,7 @@ namespace oxen::quic else _high_water.clear(); - _be_water = true; + _is_watermarked = true; log::info(log_cat, "Stream set watermarks!"); }); @@ -77,7 +77,7 @@ namespace oxen::quic void Stream::clear_watermarks() { endpoint.call_soon([this]() { - if (not _be_water and not _low_water and not _high_water) + if (not _is_watermarked and not _low_water and not _high_water) { log::warning(log_cat, "Failed to clear watermarks; stream has none set!"); return; @@ -89,11 +89,49 @@ namespace oxen::quic _low_water.clear(); if (_high_water) _high_water.clear(); - _be_water = false; + _is_watermarked = false; log::info(log_cat, "Stream cleared currently set watermarks!"); }); } + void Stream::pause() + { + endpoint.call([this]() { + if (not _paused) + { + log::debug(log_cat, "Pausing stream ID:{}", _stream_id); + assert(_paused_offset == 0); + _paused = true; + } + else + log::debug(log_cat, "Stream ID:{} already paused!", _stream_id); + }); + } + + void Stream::resume() + { + endpoint.call([this]() { + if (_paused) + { + log::debug(log_cat, "Resuming stream ID:{}", _stream_id); + if (_paused_offset) + { + ngtcp2_conn_extend_max_stream_offset(*_conn, _stream_id, _paused_offset); + _paused_offset = 0; + } + + _paused = false; + } + else + log::debug(log_cat, "Stream ID:{} is not paused!", _stream_id); + }); + } + + bool Stream::is_paused() const + { + return endpoint.call_get([this]() { return _paused; }); + } + bool Stream::available() const { return endpoint.call_get([this] { return !(_is_closing || _is_shutdown || _sent_fin); }); @@ -106,7 +144,7 @@ namespace oxen::quic bool Stream::has_watermarks() const { - return endpoint.call_get([this]() { return _be_water and _low_water and _high_water; }); + return endpoint.call_get([this]() { return _is_watermarked and _low_water and _high_water; }); } std::shared_ptr Stream::get_stream() @@ -203,7 +241,7 @@ namespace oxen::quic auto sz = size(); // Do not bother with this block of logic if no watermarks are set - if (_be_water) + if (_is_watermarked) { auto unsent = sz - _unacked_size; diff --git a/tests/012-watermarks.cpp b/tests/012-watermarks.cpp index 6571d3c8..f1672a65 100644 --- a/tests/012-watermarks.cpp +++ b/tests/012-watermarks.cpp @@ -12,9 +12,6 @@ namespace oxen::quic::test Network test_net{}; bstring req_msg(100'000, std::byte{'a'}); - std::promise d_promise; - std::future d_future = d_promise.get_future(); - auto client_established = callback_waiter{[](connection_interface&) {}}; auto server_established = callback_waiter{[](connection_interface&) {}}; @@ -85,20 +82,67 @@ namespace oxen::quic::test REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); - std::this_thread::sleep_for(150ms); + std::this_thread::sleep_for(250ms); REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); - std::this_thread::sleep_for(150ms); + std::this_thread::sleep_for(250ms); REQUIRE(low_count >= 2); - REQUIRE(high_count > 1); + REQUIRE(high_count >= 1); REQUIRE(low_count >= high_count); client_stream->clear_watermarks(); REQUIRE_FALSE(client_stream->has_watermarks()); } + + SECTION("Watermarks persist; server stream pausing") + { + std::atomic low_count{0}, high_count{0}; + + client_stream->set_watermark( + 500, + 2000, + opt::watermark{ + [&](const Stream&) { + log::debug(log_cat, "Executing low hook!"); + low_count += 1; + }, + true}, + opt::watermark{ + [&](const Stream&) { + log::debug(log_cat, "Executing high hook!"); + high_count += 1; + }, + true}); + + REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); + + std::this_thread::sleep_for(100ms); + + auto server_stream = server_endpoint->get_all_conns().front()->get_stream(client_stream->stream_id()); + REQUIRE(server_stream != nullptr); + + server_stream->pause(); + REQUIRE(server_stream->is_paused()); + + REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); + REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); + REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); + REQUIRE_NOTHROW(client_stream->send(bstring_view{req_msg})); + + server_stream->resume(); + REQUIRE_FALSE(server_stream->is_paused()); + std::this_thread::sleep_for(500ms); // stupid debian sid ARM64 CI + + REQUIRE(low_count >= 2); + REQUIRE(high_count >= 1); + + client_stream->clear_watermarks(); + + REQUIRE_FALSE(client_stream->has_watermarks()); + } } } // namespace oxen::quic::test