Skip to content

Commit

Permalink
Merge pull request #126 from dr7ana/watermarks
Browse files Browse the repository at this point in the history
Stream buffer watermarking
  • Loading branch information
dr7ana authored May 6, 2024
2 parents 826c6db + 5afdf75 commit f0c6828
Show file tree
Hide file tree
Showing 6 changed files with 373 additions and 2 deletions.
31 changes: 31 additions & 0 deletions include/oxen/quic/opt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
namespace oxen::quic
{
class Endpoint;
class Stream;

namespace opt
{
Expand Down Expand Up @@ -171,5 +172,35 @@ namespace oxen::quic

explicit operator bool() const { return send_hook != nullptr; }
};

// Used to provide callbacks for stream buffer watermarking. Application can pass an optional second parameter to
// indicate that the logic should be executed once before the callback is cleared. The default behavior is for the
// callback to persist and execute repeatedly
struct watermark
{
using buffer_hook_t = std::function<void(Stream&)>;

private:
buffer_hook_t _hook = nullptr;
bool _persist = true;

public:
watermark() = default;
explicit watermark(buffer_hook_t hook, bool persist = true) : _hook{std::move(hook)}, _persist{persist} {}

bool persist() const { return _persist; }

void clear() { _hook = nullptr; }

explicit operator bool() const { return _hook != nullptr; }

void operator()(Stream& s)
{
_hook(s);

if (not _persist)
_hook = nullptr;
}
};
} // namespace opt
} // namespace oxen::quic
49 changes: 49 additions & 0 deletions include/oxen/quic/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "connection_ids.hpp"
#include "error.hpp"
#include "iochannel.hpp"
#include "opt.hpp"
#include "types.hpp"
#include "utils.hpp"

Expand Down Expand Up @@ -56,6 +57,40 @@ namespace oxen::quic

const ConnectionID reference_id;

/** Buffer Watermarking:
- Applications can call `::set_watermark(...)` to implement logic to be executed at states dictated by the number
of bytes currently unsent.
- Application must pass `low` and `high` watermark amounts; an execute-on-low callback can be passed with or
without an execute-on-high callback (and vice versa)
- The execute-on-low callback will not be executed until the buffer state rises above the `high` value; it
will not be executed again until the buffer state rises once more above the `high` value
- The execute-on-high callback will not be executed until the buffer state drops below the `low` value; it
will not be executed again until the buffer state drops once more below the `low` value
- Callbacks can be passed with an optional boolean in their opt:: wrapper, indicating "clear after execution";
this will ensure the callback is only executed ONCE before being cleared. The default behavior is repeated
callback execution
- Invoking this function repeatedly will overwrite any currently set thresholds and callbacks
*/
void set_watermark(
size_t low, size_t high, std::optional<opt::watermark> low_hook, std::optional<opt::watermark> high_hook);

// Clears any currently set watermarks on this stream object
void clear_watermarks();

// Do not call this function from within a watermark callback!
bool has_watermarks() const;

/** Stream Pause:
- Applications can call `::pause()` to stop extending the max stream data offset. This has the effect of limiting
the inflow by signalling to the sender that they should pause
- This is reverted by invoking `::resume()`
*/
void pause();

void resume();

bool is_paused() const;

// These public methods are synchronized so that they can be safely called from outside the
// libquic main loop thread.
bool available() const;
Expand Down Expand Up @@ -109,8 +144,22 @@ namespace oxen::quic
bool _is_shutdown{false};
bool _sent_fin{false};
bool _ready{false};
bool _paused{false};
int64_t _stream_id;

size_t _paused_offset{0};

bool _is_watermarked{false};

size_t _high_mark{0};
size_t _low_mark{0};

bool _high_primed{false};
bool _low_primed{true};

opt::watermark _high_water;
opt::watermark _low_water;

void wrote(size_t bytes) override;

void append_buffer(bstring_view buffer, std::shared_ptr<void> keep_alive);
Expand Down
8 changes: 7 additions & 1 deletion src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,9 @@ namespace oxen::quic
const bool was_closing = stream._is_closing;
stream._is_closing = stream._is_shutdown = true;

if (stream._is_watermarked)
stream.clear_watermarks();

if (!was_closing)
{
log::trace(log_cat, "Invoking stream close callback");
Expand Down Expand Up @@ -1302,7 +1305,10 @@ namespace oxen::quic
}
else
{
ngtcp2_conn_extend_max_stream_offset(conn.get(), id, data.size());
if (str->_paused)
str->_paused_offset += data.size();
else
ngtcp2_conn_extend_max_stream_offset(conn.get(), id, data.size());
ngtcp2_conn_extend_max_offset(conn.get(), data.size());
}

Expand Down
138 changes: 137 additions & 1 deletion src/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,96 @@ namespace oxen::quic
log::trace(log_cat, "Destroying stream {}", _stream_id);
}

void Stream::set_watermark(
size_t low, size_t high, std::optional<opt::watermark> low_cb, std::optional<opt::watermark> high_cb)
{
if (not low_cb and not high_cb)
throw std::invalid_argument{"Must pass at least one callback in call to ::set_watermark()!"};

endpoint.call_soon([this, low, high, low_hook = std::move(low_cb), high_hook = std::move(high_cb)]() {
if (_is_closing || _is_shutdown || _sent_fin)
{
log::warning(log_cat, "Failed to set watermarks; stream is not active!");
return;
}

_low_mark = low;
_high_mark = high;

if (low_hook.has_value())
_low_water = std::move(*low_hook);
else
_low_water.clear();

if (high_hook.has_value())
_high_water = std::move(*high_hook);
else
_high_water.clear();

_is_watermarked = true;

log::info(log_cat, "Stream set watermarks!");
});
}

void Stream::clear_watermarks()
{
endpoint.call_soon([this]() {
if (not _is_watermarked and not _low_water and not _high_water)
{
log::warning(log_cat, "Failed to clear watermarks; stream has none set!");
return;
}

_low_mark = 0;
_high_mark = 0;
if (_low_water)
_low_water.clear();
if (_high_water)
_high_water.clear();
_is_watermarked = false;
log::info(log_cat, "Stream cleared currently set watermarks!");
});
}

void Stream::pause()
{
endpoint.call([this]() {
if (not _paused)
{
log::debug(log_cat, "Pausing stream ID:{}", _stream_id);
assert(_paused_offset == 0);
_paused = true;
}
else
log::debug(log_cat, "Stream ID:{} already paused!", _stream_id);
});
}

void Stream::resume()
{
endpoint.call([this]() {
if (_paused)
{
log::debug(log_cat, "Resuming stream ID:{}", _stream_id);
if (_paused_offset)
{
ngtcp2_conn_extend_max_stream_offset(*_conn, _stream_id, _paused_offset);
_paused_offset = 0;
}

_paused = false;
}
else
log::debug(log_cat, "Stream ID:{} is not paused!", _stream_id);
});
}

bool Stream::is_paused() const
{
return endpoint.call_get([this]() { return _paused; });
}

bool Stream::available() const
{
return endpoint.call_get([this] { return !(_is_closing || _is_shutdown || _sent_fin); });
Expand All @@ -52,6 +142,11 @@ namespace oxen::quic
return endpoint.call_get([this] { return _ready; });
}

bool Stream::has_watermarks() const
{
return endpoint.call_get([this]() { return _is_watermarked and _low_water and _high_water; });
}

std::shared_ptr<Stream> Stream::get_stream()
{
return shared_from_this();
Expand Down Expand Up @@ -143,7 +238,48 @@ namespace oxen::quic
if (bytes)
user_buffers.front().first.remove_prefix(bytes);

log::trace(log_cat, "{} bytes acked, {} unacked remaining", bytes, size());
auto sz = size();

// Do not bother with this block of logic if no watermarks are set
if (_is_watermarked)
{
auto unsent = sz - _unacked_size;

// We are above the high watermark. We prime the low water hook to be fired the next time we drop below the low
// watermark. If the high water hook exists and is primed, execute it
if (unsent >= _high_mark)
{
_low_primed = true;
log::info(log_cat, "Low water hook primed!");

if (_high_water and _high_primed)
{
log::info(log_cat, "Executing high watermark hook!");
_high_primed = false;
return _high_water(*this);
}
}
// We are below the low watermark. We prime the high water hook to be fired the next time we rise above the high
// watermark. If the low water hook exists and is primed, execute it
else if (unsent <= _low_mark)
{
_high_primed = true;
log::info(log_cat, "High water hook primed!");

if (_low_water and _low_primed)
{
log::info(log_cat, "Executing low watermark hook!");
_low_primed = false;
return _low_water(*this);
}
}

// Low/high watermarks were executed and self-cleared, so clean up
if (not _high_water and not _low_water)
return clear_watermarks();
}

log::trace(log_cat, "{} bytes acked, {} unacked remaining", bytes, sz);
}

void Stream::wrote(size_t bytes)
Expand Down
Loading

0 comments on commit f0c6828

Please sign in to comment.