From 1f2428adaa7acc5f654b172e10e8ed65a25fcd21 Mon Sep 17 00:00:00 2001 From: dr7ana Date: Wed, 8 May 2024 06:08:12 -0700 Subject: [PATCH] Callbacks on remote read/write shutdown - Applications can call `::set_remote_reset_hooks(...)` to emplace logic to be executed when the remote stream shuts down reading and/or writing - This only happens once per lifetime of the stream; as a result, do NOT set more hooks while inside the body of the hooks themselves! --- include/oxen/quic/connection.hpp | 1 + include/oxen/quic/error.hpp | 6 +++ include/oxen/quic/opt.hpp | 37 ++++++++++++++++++ include/oxen/quic/stream.hpp | 20 +++++++++- src/connection.cpp | 47 ++++++++++++++++++++++- src/stream.cpp | 66 +++++++++++++++++++++++++++++--- tests/012-stream_signalling.cpp | 19 ++++++++- 7 files changed, 186 insertions(+), 10 deletions(-) diff --git a/include/oxen/quic/connection.hpp b/include/oxen/quic/connection.hpp index a795da82..2a76adf5 100644 --- a/include/oxen/quic/connection.hpp +++ b/include/oxen/quic/connection.hpp @@ -490,6 +490,7 @@ namespace oxen::quic int stream_ack(int64_t id, size_t size); int stream_receive(int64_t id, bstring_view data, bool fin); void stream_execute_close(Stream& s, uint64_t app_code); + void stream_reset(int64_t id, uint64_t app_code); void stream_closed(int64_t id, uint64_t app_code); void close_all_streams(); void check_pending_streams(uint64_t available); diff --git a/include/oxen/quic/error.hpp b/include/oxen/quic/error.hpp index 90134ddc..8d625241 100644 --- a/include/oxen/quic/error.hpp +++ b/include/oxen/quic/error.hpp @@ -20,6 +20,12 @@ namespace oxen::quic // Application error code we close with if the stream data handle throws inline constexpr uint64_t STREAM_ERROR_EXCEPTION = ERROR_BASE + 100; + // Application error code for signalling a remote shut down stream reading + inline constexpr uint64_t STREAM_REMOTE_READ_SHUTDOWN = ERROR_BASE + 101; + + // Application error code for signalling a remote shut down stream writing + inline constexpr uint64_t STREAM_REMOTE_WRITE_SHUTDOWN = ERROR_BASE + 102; + // Application error if a bt request stream handle throws an exception inline constexpr uint64_t BPARSER_ERROR_EXCEPTION = ERROR_BASE + 105; diff --git a/include/oxen/quic/opt.hpp b/include/oxen/quic/opt.hpp index fd80cc1a..a6232e1e 100644 --- a/include/oxen/quic/opt.hpp +++ b/include/oxen/quic/opt.hpp @@ -202,5 +202,42 @@ namespace oxen::quic _hook = nullptr; } }; + + // Used to provide callbacks for remote stream reset. Application can pass one or both callbacks to indicate what + // logic should be executed when the remote shuts down stream reading or writing. The signature of `on_reset_hook_t` + // matches that of other hooks, so we wrap it in an opt struct to differentiate and to structure access. + struct remote_stream_reset + { + using on_reset_hook_t = std::function; + + private: + on_reset_hook_t _on_read_reset = nullptr; + on_reset_hook_t _on_write_reset = nullptr; + + public: + remote_stream_reset() = default; + + explicit remote_stream_reset(on_reset_hook_t _on_read, on_reset_hook_t _on_write = nullptr) : + _on_read_reset{std::move(_on_read)}, _on_write_reset{std::move(_on_write)} + { + if (not _on_read_reset and not _on_write_reset) + throw std::invalid_argument{"Must set at least one of `on_read_reset` and `on_write_reset`!"}; + } + + explicit operator bool() const { return has_read_hook() and has_write_hook(); } + + void clear() + { + _on_read_reset = nullptr; + _on_write_reset = nullptr; + } + + bool has_read_hook() const { return _on_read_reset != nullptr; } + bool has_write_hook() const { return _on_write_reset != nullptr; } + + void read_reset(Stream& s, uint64_t ec) { return _on_read_reset(s, ec); } + void write_reset(Stream& s, uint64_t ec) { return _on_write_reset(s, ec); } + }; + } // namespace opt } // namespace oxen::quic diff --git a/include/oxen/quic/stream.hpp b/include/oxen/quic/stream.hpp index 41318d8e..8afab2d2 100644 --- a/include/oxen/quic/stream.hpp +++ b/include/oxen/quic/stream.hpp @@ -91,14 +91,26 @@ namespace oxen::quic bool is_paused() const; - bool is_reading() const; + /** Remote Stream Reset: + - Applications can call `::set_remote_reset_hooks(...)` to emplace logic to be executed when the remote stream + shuts down reading and/or writing + - This only happens once per lifetime of the stream; as a result, do NOT set more hooks while inside the body of + the hooks themselves! + */ + void set_remote_reset_hooks(opt::remote_stream_reset hooks); - bool is_writing() const; + void clear_remote_reset_hooks(); + + bool has_remote_reset_hooks() const; void stop_reading(); void stop_writing(); + bool is_reading() const; + + bool is_writing() const; + // These public methods are synchronized so that they can be safely called from outside the // libquic main loop thread. bool available() const; @@ -168,6 +180,10 @@ namespace oxen::quic opt::watermark _high_water; opt::watermark _low_water; + opt::remote_stream_reset _remote_reset; + + bool _in_reset{false}; + bool _is_reading{true}; bool _is_writing{true}; diff --git a/src/connection.cpp b/src/connection.cpp index 9ec57d3b..10c532eb 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -124,7 +124,7 @@ namespace oxen::quic void* /*stream_user_data*/) { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); - static_cast(user_data)->stream_closed(stream_id, app_error_code); + static_cast(user_data)->stream_reset(stream_id, app_error_code); return 0; } @@ -1171,6 +1171,51 @@ namespace oxen::quic } } + void Connection::stream_reset(int64_t id, uint64_t app_code) + { + log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); + assert(ngtcp2_is_bidi_stream(id)); + auto it = _streams.find(id); + + if (it == _streams.end()) + return; + + auto& stream = it->second; + + switch (app_code) + { + case STREAM_REMOTE_READ_SHUTDOWN: + log::debug(log_cat, "Stream (ID:{}) received remote read shutdown signal!", id); + + if (stream->_remote_reset.has_read_hook()) + { + log::debug(log_cat, "Invoking remote_read_reset hook..."); + stream->_in_reset = true; + stream->_remote_reset.read_reset(*stream.get(), app_code); + stream->_in_reset = false; + } + + break; + + case STREAM_REMOTE_WRITE_SHUTDOWN: + log::debug(log_cat, "Stream (ID:{}) received remote write shutdown signal!", id); + + if (stream->_remote_reset.has_write_hook()) + { + log::debug(log_cat, "Invoking remote_write_reset hook..."); + stream->_in_reset = true; + stream->_remote_reset.write_reset(*stream.get(), app_code); + stream->_in_reset = false; + } + + break; + + default: + log::critical( + log_cat, "Stream (ID:{}) received unrecognized app code (ec:{}) for stream reset!", id, app_code); + } + } + void Connection::stream_closed(int64_t id, uint64_t app_code) { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); diff --git a/src/stream.cpp b/src/stream.cpp index f3d720a6..f04c221f 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -127,6 +127,38 @@ namespace oxen::quic }); } + void Stream::set_remote_reset_hooks(opt::remote_stream_reset sr) + { + // we can use ::call(...) instead of ::call_soon(...) because stream read/write shutdown only happens once per stream + // lifetime, and the application would be beyond incorrect to invoke this function in the callbacks themselves + endpoint.call([this, hooks = std::move(sr)]() { + if (_in_reset) + throw std::runtime_error{"Cannot set `remote_stream_reset` while executing currently set hooks!!"}; + + log::debug(log_cat, "Stream (ID:{}) provided `remote_stream_reset` hooks!", _stream_id); + _remote_reset = std::move(hooks); + }); + } + + void Stream::clear_remote_reset_hooks() + { + // we can use ::call(...) instead of ::call_soon(...) because stream read/write shutdown only happens once per stream + // lifetime, and the application would be beyond incorrect to invoke this function in the callbacks themselves + endpoint.call([this]() { + if (_in_reset) + throw std::runtime_error{"Cannot set `remote_stream_reset` while executing currently set hooks!!"}; + + log::debug(log_cat, "Stream (ID:{}) cleared `remote_stream_reset` hooks!", _stream_id); + _remote_reset.clear(); + assert(not _remote_reset); + }); + } + + bool Stream::has_remote_reset_hooks() const + { + return endpoint.call_get([this]() { return _remote_reset.has_read_hook() and _remote_reset.has_write_hook(); }); + } + void Stream::stop_reading() { endpoint.call([this]() { @@ -139,7 +171,7 @@ namespace oxen::quic _is_reading = false; log::warning(log_cat, "Halting all read operations on stream ID:{}!", _stream_id); - ngtcp2_conn_shutdown_stream_read(*_conn, 0, _stream_id, 0); + ngtcp2_conn_shutdown_stream_read(*_conn, 0, _stream_id, STREAM_REMOTE_READ_SHUTDOWN); }); } @@ -152,6 +184,18 @@ namespace oxen::quic return; } + if (user_buffers.empty()) + { + log::warning( + log_cat, + "All transmitted data dispatched and acked; halting all write operations on stream ID:{}", + _stream_id); + ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, STREAM_REMOTE_WRITE_SHUTDOWN); + return clear_watermarks(); + } + + // if buffers are empty and we call shutdown_stream_write now, we do not need to flip this boolean; it is used to + // signal for the same call in ::acknowledge() _is_writing = false; }); } @@ -248,6 +292,13 @@ namespace oxen::quic void Stream::append_buffer(bstring_view buffer, std::shared_ptr keep_alive) { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); + + if (not _is_writing) + { + log::warning(log_cat, "Stream (ID:{}) has halted writing; payload NOT appended to buffer!", _stream_id); + return; + } + user_buffers.emplace_back(buffer, std::move(keep_alive)); assert(endpoint.in_event_loop()); assert(_conn); @@ -299,15 +350,18 @@ namespace oxen::quic if (bytes) user_buffers.front().first.remove_prefix(bytes); - auto sz = size(); - - if (not _is_writing and _unacked_size == 0) + if (not _is_writing and user_buffers.empty()) { - log::warning(log_cat, "All transmitted data acked; halting all write operations on stream ID:{}", _stream_id); - ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, 0); + log::warning( + log_cat, + "All transmitted data dispatched and acked; halting all write operations on stream ID:{}", + _stream_id); + ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, STREAM_REMOTE_WRITE_SHUTDOWN); return clear_watermarks(); } + auto sz = size(); + // Do not bother with this block of logic if no watermarks are set if (_is_watermarked) { diff --git a/tests/012-stream_signalling.cpp b/tests/012-stream_signalling.cpp index b0192953..771381af 100644 --- a/tests/012-stream_signalling.cpp +++ b/tests/012-stream_signalling.cpp @@ -180,6 +180,21 @@ namespace oxen::quic::test auto client_stream = conn_interface->open_stream([&](Stream&, bstring_view) { p.set_value(true); }); + client_stream->set_remote_reset_hooks(opt::remote_stream_reset{ + [](Stream& s, uint64_t ec) { + REQUIRE(ec == STREAM_REMOTE_READ_SHUTDOWN); + + // Cannot set or clear callbacks while executing the callbacks! + REQUIRE_THROWS(s.set_remote_reset_hooks(opt::remote_stream_reset{})); + REQUIRE_THROWS(s.clear_remote_reset_hooks()); + + s.stop_writing(); + }, + [](Stream& s, uint64_t ec) { + REQUIRE(ec == STREAM_REMOTE_WRITE_SHUTDOWN); + s.stop_reading(); + }}); + REQUIRE(client_stream->is_reading()); REQUIRE(client_stream->is_writing()); @@ -189,11 +204,13 @@ namespace oxen::quic::test REQUIRE_FALSE(server_stream->is_writing()); client_stream->send(bstring_view{req_msg}); - REQUIRE(f.get()); + require_future(f); // allow the acks to get back to the client; extra time for slow CI archs std::this_thread::sleep_for(250ms); + REQUIRE_FALSE(client_stream->is_reading()); + REQUIRE(TestHelper::stream_unacked(*server_stream.get()) == 0); }