Skip to content

Commit

Permalink
Callbacks on remote read/write shutdown
Browse files Browse the repository at this point in the history
- Applications can call `::set_remote_reset_hooks(...)` to emplace logic to be executed when the remote stream
    shuts down reading and/or writing
- This only happens once per lifetime of the stream; as a result, do NOT set more hooks while inside the body of
    the hooks themselves!
  • Loading branch information
dr7ana committed May 8, 2024
1 parent 2b080e0 commit 1f2428a
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 10 deletions.
1 change: 1 addition & 0 deletions include/oxen/quic/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ namespace oxen::quic
int stream_ack(int64_t id, size_t size);
int stream_receive(int64_t id, bstring_view data, bool fin);
void stream_execute_close(Stream& s, uint64_t app_code);
void stream_reset(int64_t id, uint64_t app_code);
void stream_closed(int64_t id, uint64_t app_code);
void close_all_streams();
void check_pending_streams(uint64_t available);
Expand Down
6 changes: 6 additions & 0 deletions include/oxen/quic/error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ namespace oxen::quic
// Application error code we close with if the stream data handle throws
inline constexpr uint64_t STREAM_ERROR_EXCEPTION = ERROR_BASE + 100;

// Application error code for signalling a remote shut down stream reading
inline constexpr uint64_t STREAM_REMOTE_READ_SHUTDOWN = ERROR_BASE + 101;

// Application error code for signalling a remote shut down stream writing
inline constexpr uint64_t STREAM_REMOTE_WRITE_SHUTDOWN = ERROR_BASE + 102;

// Application error if a bt request stream handle throws an exception
inline constexpr uint64_t BPARSER_ERROR_EXCEPTION = ERROR_BASE + 105;

Expand Down
37 changes: 37 additions & 0 deletions include/oxen/quic/opt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,5 +202,42 @@ namespace oxen::quic
_hook = nullptr;
}
};

// Used to provide callbacks for remote stream reset. Application can pass one or both callbacks to indicate what
// logic should be executed when the remote shuts down stream reading or writing. The signature of `on_reset_hook_t`
// matches that of other hooks, so we wrap it in an opt struct to differentiate and to structure access.
struct remote_stream_reset
{
using on_reset_hook_t = std::function<void(Stream&, uint64_t)>;

private:
on_reset_hook_t _on_read_reset = nullptr;
on_reset_hook_t _on_write_reset = nullptr;

public:
remote_stream_reset() = default;

explicit remote_stream_reset(on_reset_hook_t _on_read, on_reset_hook_t _on_write = nullptr) :
_on_read_reset{std::move(_on_read)}, _on_write_reset{std::move(_on_write)}
{
if (not _on_read_reset and not _on_write_reset)
throw std::invalid_argument{"Must set at least one of `on_read_reset` and `on_write_reset`!"};
}

explicit operator bool() const { return has_read_hook() and has_write_hook(); }

void clear()
{
_on_read_reset = nullptr;
_on_write_reset = nullptr;
}

bool has_read_hook() const { return _on_read_reset != nullptr; }
bool has_write_hook() const { return _on_write_reset != nullptr; }

void read_reset(Stream& s, uint64_t ec) { return _on_read_reset(s, ec); }
void write_reset(Stream& s, uint64_t ec) { return _on_write_reset(s, ec); }
};

} // namespace opt
} // namespace oxen::quic
20 changes: 18 additions & 2 deletions include/oxen/quic/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,26 @@ namespace oxen::quic

bool is_paused() const;

bool is_reading() const;
/** Remote Stream Reset:
- Applications can call `::set_remote_reset_hooks(...)` to emplace logic to be executed when the remote stream
shuts down reading and/or writing
- This only happens once per lifetime of the stream; as a result, do NOT set more hooks while inside the body of
the hooks themselves!
*/
void set_remote_reset_hooks(opt::remote_stream_reset hooks);

bool is_writing() const;
void clear_remote_reset_hooks();

bool has_remote_reset_hooks() const;

void stop_reading();

void stop_writing();

bool is_reading() const;

bool is_writing() const;

// These public methods are synchronized so that they can be safely called from outside the
// libquic main loop thread.
bool available() const;
Expand Down Expand Up @@ -168,6 +180,10 @@ namespace oxen::quic
opt::watermark _high_water;
opt::watermark _low_water;

opt::remote_stream_reset _remote_reset;

bool _in_reset{false};

bool _is_reading{true};
bool _is_writing{true};

Expand Down
47 changes: 46 additions & 1 deletion src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ namespace oxen::quic
void* /*stream_user_data*/)
{
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
static_cast<Connection*>(user_data)->stream_closed(stream_id, app_error_code);
static_cast<Connection*>(user_data)->stream_reset(stream_id, app_error_code);
return 0;
}

Expand Down Expand Up @@ -1171,6 +1171,51 @@ namespace oxen::quic
}
}

void Connection::stream_reset(int64_t id, uint64_t app_code)
{
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
assert(ngtcp2_is_bidi_stream(id));
auto it = _streams.find(id);

if (it == _streams.end())
return;

auto& stream = it->second;

switch (app_code)
{
case STREAM_REMOTE_READ_SHUTDOWN:
log::debug(log_cat, "Stream (ID:{}) received remote read shutdown signal!", id);

if (stream->_remote_reset.has_read_hook())
{
log::debug(log_cat, "Invoking remote_read_reset hook...");
stream->_in_reset = true;
stream->_remote_reset.read_reset(*stream.get(), app_code);
stream->_in_reset = false;
}

break;

case STREAM_REMOTE_WRITE_SHUTDOWN:
log::debug(log_cat, "Stream (ID:{}) received remote write shutdown signal!", id);

if (stream->_remote_reset.has_write_hook())
{
log::debug(log_cat, "Invoking remote_write_reset hook...");
stream->_in_reset = true;
stream->_remote_reset.write_reset(*stream.get(), app_code);
stream->_in_reset = false;
}

break;

default:
log::critical(
log_cat, "Stream (ID:{}) received unrecognized app code (ec:{}) for stream reset!", id, app_code);
}
}

void Connection::stream_closed(int64_t id, uint64_t app_code)
{
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
Expand Down
66 changes: 60 additions & 6 deletions src/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,38 @@ namespace oxen::quic
});
}

void Stream::set_remote_reset_hooks(opt::remote_stream_reset sr)
{
// we can use ::call(...) instead of ::call_soon(...) because stream read/write shutdown only happens once per stream
// lifetime, and the application would be beyond incorrect to invoke this function in the callbacks themselves
endpoint.call([this, hooks = std::move(sr)]() {
if (_in_reset)
throw std::runtime_error{"Cannot set `remote_stream_reset` while executing currently set hooks!!"};

log::debug(log_cat, "Stream (ID:{}) provided `remote_stream_reset` hooks!", _stream_id);
_remote_reset = std::move(hooks);
});
}

void Stream::clear_remote_reset_hooks()
{
// we can use ::call(...) instead of ::call_soon(...) because stream read/write shutdown only happens once per stream
// lifetime, and the application would be beyond incorrect to invoke this function in the callbacks themselves
endpoint.call([this]() {
if (_in_reset)
throw std::runtime_error{"Cannot set `remote_stream_reset` while executing currently set hooks!!"};

log::debug(log_cat, "Stream (ID:{}) cleared `remote_stream_reset` hooks!", _stream_id);
_remote_reset.clear();
assert(not _remote_reset);
});
}

bool Stream::has_remote_reset_hooks() const
{
return endpoint.call_get([this]() { return _remote_reset.has_read_hook() and _remote_reset.has_write_hook(); });
}

void Stream::stop_reading()
{
endpoint.call([this]() {
Expand All @@ -139,7 +171,7 @@ namespace oxen::quic
_is_reading = false;

log::warning(log_cat, "Halting all read operations on stream ID:{}!", _stream_id);
ngtcp2_conn_shutdown_stream_read(*_conn, 0, _stream_id, 0);
ngtcp2_conn_shutdown_stream_read(*_conn, 0, _stream_id, STREAM_REMOTE_READ_SHUTDOWN);
});
}

Expand All @@ -152,6 +184,18 @@ namespace oxen::quic
return;
}

if (user_buffers.empty())
{
log::warning(
log_cat,
"All transmitted data dispatched and acked; halting all write operations on stream ID:{}",
_stream_id);
ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, STREAM_REMOTE_WRITE_SHUTDOWN);
return clear_watermarks();
}

// if buffers are empty and we call shutdown_stream_write now, we do not need to flip this boolean; it is used to
// signal for the same call in ::acknowledge()
_is_writing = false;
});
}
Expand Down Expand Up @@ -248,6 +292,13 @@ namespace oxen::quic
void Stream::append_buffer(bstring_view buffer, std::shared_ptr<void> keep_alive)
{
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);

if (not _is_writing)
{
log::warning(log_cat, "Stream (ID:{}) has halted writing; payload NOT appended to buffer!", _stream_id);
return;
}

user_buffers.emplace_back(buffer, std::move(keep_alive));
assert(endpoint.in_event_loop());
assert(_conn);
Expand Down Expand Up @@ -299,15 +350,18 @@ namespace oxen::quic
if (bytes)
user_buffers.front().first.remove_prefix(bytes);

auto sz = size();

if (not _is_writing and _unacked_size == 0)
if (not _is_writing and user_buffers.empty())
{
log::warning(log_cat, "All transmitted data acked; halting all write operations on stream ID:{}", _stream_id);
ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, 0);
log::warning(
log_cat,
"All transmitted data dispatched and acked; halting all write operations on stream ID:{}",
_stream_id);
ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, STREAM_REMOTE_WRITE_SHUTDOWN);
return clear_watermarks();
}

auto sz = size();

// Do not bother with this block of logic if no watermarks are set
if (_is_watermarked)
{
Expand Down
19 changes: 18 additions & 1 deletion tests/012-stream_signalling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,21 @@ namespace oxen::quic::test

auto client_stream = conn_interface->open_stream<Stream>([&](Stream&, bstring_view) { p.set_value(true); });

client_stream->set_remote_reset_hooks(opt::remote_stream_reset{
[](Stream& s, uint64_t ec) {
REQUIRE(ec == STREAM_REMOTE_READ_SHUTDOWN);

// Cannot set or clear callbacks while executing the callbacks!
REQUIRE_THROWS(s.set_remote_reset_hooks(opt::remote_stream_reset{}));
REQUIRE_THROWS(s.clear_remote_reset_hooks());

s.stop_writing();
},
[](Stream& s, uint64_t ec) {
REQUIRE(ec == STREAM_REMOTE_WRITE_SHUTDOWN);
s.stop_reading();
}});

REQUIRE(client_stream->is_reading());
REQUIRE(client_stream->is_writing());

Expand All @@ -189,11 +204,13 @@ namespace oxen::quic::test
REQUIRE_FALSE(server_stream->is_writing());

client_stream->send(bstring_view{req_msg});
REQUIRE(f.get());
require_future(f);

// allow the acks to get back to the client; extra time for slow CI archs
std::this_thread::sleep_for(250ms);

REQUIRE_FALSE(client_stream->is_reading());

REQUIRE(TestHelper::stream_unacked(*server_stream.get()) == 0);
}

Expand Down

0 comments on commit 1f2428a

Please sign in to comment.