Skip to content

Commit

Permalink
squashing working commits
Browse files Browse the repository at this point in the history
- speedtest py scripts work
  • Loading branch information
dr7ana committed May 12, 2024
1 parent 5786cf8 commit e2468a5
Show file tree
Hide file tree
Showing 15 changed files with 623 additions and 289 deletions.
7 changes: 4 additions & 3 deletions include/oxen/quic/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ namespace oxen::quic
template <oxenc::basic_char Char>
void send_datagram(std::vector<Char>&& buf)
{
send_datagram(
std::basic_string_view<Char>{buf.data(), buf.size()},
std::make_shared<std::vector<Char>>(std::move(buf)));
auto keep_alive = std::make_shared<std::vector<Char>>(std::move(buf));
std::basic_string_view<Char> view{keep_alive->data(), keep_alive->size()};
send_datagram(view, std::move(keep_alive));
}

template <oxenc::basic_char CharType>
Expand Down Expand Up @@ -490,6 +490,7 @@ namespace oxen::quic
int stream_ack(int64_t id, size_t size);
int stream_receive(int64_t id, bstring_view data, bool fin);
void stream_execute_close(Stream& s, uint64_t app_code);
void stream_stop_sending(int64_t id, uint64_t app_code);
void stream_reset(int64_t id, uint64_t app_code);
void stream_closed(int64_t id, uint64_t app_code);
void close_all_streams();
Expand Down
2 changes: 1 addition & 1 deletion include/oxen/quic/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ namespace oxen::quic
bool _paused{false};
int64_t _stream_id;

size_t _paused_offset{0};
std::atomic<size_t> _paused_offset{0};

bool _is_watermarked{false};

Expand Down
99 changes: 70 additions & 29 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ namespace oxen::quic
return 0;
}

static int on_stream_stop_sending(
ngtcp2_conn* /* conn */,
int64_t stream_id,
uint64_t app_error_code,
void* user_data,
void* /* stream_user_data */)
{
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
static_cast<Connection*>(user_data)->stream_stop_sending(stream_id, app_error_code);
return 0;
}

static int on_stream_reset(
ngtcp2_conn* /*conn*/,
int64_t stream_id,
Expand Down Expand Up @@ -934,7 +946,7 @@ namespace oxen::quic
}
else if (bufs.empty())
{
log::debug(log_cat, "pending() returned empty buffer for stream ID {}, moving on", stream_id);
log::trace(log_cat, "pending() returned empty buffer for stream ID {}, moving on", stream_id);
continue;
}
}
Expand Down Expand Up @@ -971,7 +983,7 @@ namespace oxen::quic
dgram.size(),
ts);

log::debug(log_cat, "ngtcp2_conn_writev_datagram returned a value of {}", nwrite);
log::trace(log_cat, "ngtcp2_conn_writev_datagram returned a value of {}", nwrite);

if (datagram_accepted != 0)
{
Expand Down Expand Up @@ -1082,7 +1094,7 @@ namespace oxen::quic
log::trace(log_cat, "Sending final packet batch of {} packets", n_packets);
send(&pkt_updater);
}
log::debug(log_cat, "Exiting flush_streams()");
log::trace(log_cat, "Exiting flush_streams()");
}

void Connection::schedule_packet_retransmit(std::chrono::steady_clock::time_point ts)
Expand Down Expand Up @@ -1160,6 +1172,7 @@ namespace oxen::quic
{
const bool was_closing = stream._is_closing;
stream._is_closing = stream._is_shutdown = true;
stream._is_reading = stream._is_writing = false;

if (stream._is_watermarked)
stream.clear_watermarks();
Expand All @@ -1171,7 +1184,7 @@ namespace oxen::quic
}
}

void Connection::stream_reset(int64_t id, uint64_t app_code)
void Connection::stream_stop_sending(int64_t id, uint64_t app_code)
{
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
assert(ngtcp2_is_bidi_stream(id));
Expand All @@ -1182,37 +1195,60 @@ namespace oxen::quic

auto& stream = it->second;

switch (app_code)
// auto msg = "Stream (ID:{}) received STOP_SENDING from remote; "_format(id);

// if (not stream->is_writing())
// {
// msg += "writing already disabled, closing stream!"sv;
// log::critical(log_cat, "{}", msg);
// return stream->close(app_code);
// }

// msg += "shutting down local stream read!"sv;
// log::info(log_cat, "{}", msg);

// stream->stop_reading();

if (stream->_remote_reset.has_read_hook())
{
log::info(log_cat, "Invoking on_remote_read_reset hook...");
stream->_in_reset = true;
stream->_remote_reset.read_reset(*stream.get(), app_code);
stream->_in_reset = false;
}
else
{
case STREAM_REMOTE_READ_SHUTDOWN:
log::debug(log_cat, "Stream (ID:{}) received remote read shutdown signal!", id);
log::critical(log_cat, "THIS IS BAD");
}
}

if (stream->_remote_reset.has_read_hook())
{
log::debug(log_cat, "Invoking remote_read_reset hook...");
stream->_in_reset = true;
stream->_remote_reset.read_reset(*stream.get(), app_code);
stream->_in_reset = false;
}
void Connection::stream_reset(int64_t id, uint64_t app_code)
{
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
assert(ngtcp2_is_bidi_stream(id));
auto it = _streams.find(id);

break;
if (it == _streams.end())
return;

case STREAM_REMOTE_WRITE_SHUTDOWN:
log::debug(log_cat, "Stream (ID:{}) received remote write shutdown signal!", id);
auto& stream = it->second;

if (stream->_remote_reset.has_write_hook())
{
log::debug(log_cat, "Invoking remote_write_reset hook...");
stream->_in_reset = true;
stream->_remote_reset.write_reset(*stream.get(), app_code);
stream->_in_reset = false;
}
// // No need to check if we already stopped reading; ngtcp2 will shut the stream down after a stream reset if both
// // read/write are shut down
// log::info(log_cat, "Stream (ID:{}) received STREAM_RESET from remote; shutting down local stream write!", id);

break;
// stream->stop_writing();

default:
log::critical(
log_cat, "Stream (ID:{}) received unrecognized app code (ec:{}) for stream reset!", id, app_code);
if (stream->_remote_reset.has_write_hook())
{
log::info(log_cat, "Invoking on_remote_write_reset hook...");
stream->_in_reset = true;
stream->_remote_reset.write_reset(*stream.get(), app_code);
stream->_in_reset = false;
}
else
{
log::critical(log_cat, "THIS IS BAD");
}
}

Expand Down Expand Up @@ -1350,11 +1386,15 @@ namespace oxen::quic
}
else
{
size_t sz = 0;

if (str->_paused)
str->_paused_offset += data.size();
else
ngtcp2_conn_extend_max_stream_offset(conn.get(), id, data.size());
sz = data.size();

ngtcp2_conn_extend_max_offset(conn.get(), data.size());
ngtcp2_conn_extend_max_stream_offset(conn.get(), id, sz);
}

return 0;
Expand Down Expand Up @@ -1519,6 +1559,7 @@ namespace oxen::quic
callbacks.dcid_status = Callbacks::on_connection_id_status;
callbacks.update_key = ngtcp2_crypto_update_key_cb;
callbacks.stream_reset = Callbacks::on_stream_reset;
callbacks.stream_stop_sending = Callbacks::on_stream_stop_sending;
callbacks.delete_crypto_aead_ctx = ngtcp2_crypto_delete_crypto_aead_ctx_cb;
callbacks.delete_crypto_cipher_ctx = ngtcp2_crypto_delete_crypto_cipher_ctx_cb;
callbacks.get_path_challenge_data = ngtcp2_crypto_get_path_challenge_data_cb;
Expand Down
2 changes: 1 addition & 1 deletion src/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ namespace oxen::quic
}
}
else
log::debug(log_cat, "Found associated connection to incoming DCID!");
log::trace(log_cat, "Found associated connection to incoming DCID!");

if (cptr->is_outbound())
// For a inbound packet on an outbound connection the packet handling code will have set
Expand Down
2 changes: 1 addition & 1 deletion src/messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ namespace oxen::quic
}
else
{
log::debug(log_cat, "enable_datagram_drop_test is false, skipping optional logic");
log::trace(log_cat, "enable_datagram_drop_test is false, skipping optional logic");
}

log::trace(
Expand Down
52 changes: 33 additions & 19 deletions src/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ namespace oxen::quic
if (_high_water)
_high_water.clear();
_is_watermarked = false;
log::info(log_cat, "Stream cleared currently set watermarks!");
log::info(log_cat, "Stream (ID:{}) cleared currently set watermarks!", _stream_id);
});
}

Expand All @@ -117,6 +117,7 @@ namespace oxen::quic
if (_paused_offset)
{
ngtcp2_conn_extend_max_stream_offset(*_conn, _stream_id, _paused_offset);
// ngtcp2_conn_extend_max_offset(*_conn, _paused_offset);
_paused_offset = 0;
}

Expand Down Expand Up @@ -156,21 +157,21 @@ namespace oxen::quic

bool Stream::has_remote_reset_hooks() const
{
return endpoint.call_get([this]() { return _remote_reset.has_read_hook() and _remote_reset.has_write_hook(); });
return endpoint.call_get([this]() { return _remote_reset.has_read_hook() or _remote_reset.has_write_hook(); });
}

void Stream::stop_reading()
{
endpoint.call([this]() {
if (not _is_reading)
{
log::warning(log_cat, "Stream has already halted read operations!");
log::warning(log_cat, "Stream (ID:{}) has already halted read operations!", _stream_id);
return;
}

_is_reading = false;

log::warning(log_cat, "Halting all read operations on stream ID:{}!", _stream_id);
log::critical(log_cat, "Halting all read operations on stream ID:{}!", _stream_id);
ngtcp2_conn_shutdown_stream_read(*_conn, 0, _stream_id, STREAM_REMOTE_READ_SHUTDOWN);
});
}
Expand All @@ -180,20 +181,23 @@ namespace oxen::quic
endpoint.call([this]() {
if (not _is_writing)
{
log::warning(log_cat, "Stream has already halted write operations!");
log::warning(log_cat, "Stream (ID:{}) has already halted write operations!", _stream_id);
return;
}

if (user_buffers.empty())
{
log::warning(
log::critical(
log_cat,
"All transmitted data dispatched and acked; halting all write operations on stream ID:{}",
"All transmitted data dispatched (unacked:{}) and acked; halting all write operations on stream "
"ID:{}",
_unacked_size,
_stream_id);
ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, STREAM_REMOTE_WRITE_SHUTDOWN);
return clear_watermarks();
return;
}

log::critical(log_cat, "Stream deferring writing shutdown until output buffers cleared...");
// if buffers are empty and we call shutdown_stream_write now, we do not need to flip this boolean; it is used to
// signal for the same call in ::acknowledge()
_is_writing = false;
Expand All @@ -214,7 +218,7 @@ namespace oxen::quic
{
return endpoint.call_get([this]() { return _is_writing; });
}

std::shared_ptr<connection_interface> Stream::get_conn_interface()
{
return std::static_pointer_cast<connection_interface>(_conn->shared_from_this());
Expand Down Expand Up @@ -257,6 +261,7 @@ namespace oxen::quic
else
{
_is_closing = _is_shutdown = true;

if (_conn)
{
log::info(log_cat, "Closing stream (ID: {}) with: {}", _stream_id, quic_strerror(app_err_code));
Expand Down Expand Up @@ -316,14 +321,18 @@ namespace oxen::quic
{
// We are above the high watermark. We prime the low water hook to be fired the next time we drop below the low
// watermark. If the high water hook exists and is primed, execute it
if (auto unsent = size() - _unacked_size; unsent >= _high_mark)
// if (auto unsent = size() - _unacked_size; unsent >= _high_mark)
if (size() >= _high_mark)
{
_low_primed = true;
log::info(log_cat, "Low water hook primed!");
if (not _low_primed)
{
_low_primed = true;
log::debug(log_cat, "Low water hook primed!");
}

if (_high_water and _high_primed)
{
log::info(log_cat, "Executing high watermark hook!");
log::debug(log_cat, "Executing high watermark hook!");
_high_primed = false;
_high_water(*this);
}
Expand Down Expand Up @@ -359,10 +368,11 @@ namespace oxen::quic
{
log::warning(
log_cat,
"All transmitted data dispatched and acked; halting all write operations on stream ID:{}",
"All transmitted data dispatched (unacked:{}) and acked; halting all write operations on stream ID:{}",
_unacked_size,
_stream_id);
ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, STREAM_REMOTE_WRITE_SHUTDOWN);
return clear_watermarks();
return /* clear_watermarks() */;
}

auto sz = size();
Expand All @@ -372,14 +382,18 @@ namespace oxen::quic
{
// We are below the low watermark. We prime the high water hook to be fired the next time we rise above the high
// watermark. If the low water hook exists and is primed, execute it
if (auto unsent = sz - _unacked_size; unsent <= _low_mark)
// if (auto unsent = sz - _unacked_size; unsent <= _low_mark)
if (sz <= _low_mark)
{
_high_primed = true;
log::info(log_cat, "High water hook primed!");
if (not _high_primed)
{
_high_primed = true;
log::debug(log_cat, "High water hook primed!");
}

if (_low_water and _low_primed)
{
log::info(log_cat, "Executing low watermark hook!");
log::debug(log_cat, "Executing low watermark hook!");
_low_primed = false;
_low_water(*this);
}
Expand Down
1 change: 1 addition & 0 deletions tests/001-handshake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <oxen/quic/gnutls_crypto.hpp>
#include <thread>

#include "tcp.hpp"
#include "utils.hpp"

namespace oxen::quic::test
Expand Down
Loading

0 comments on commit e2468a5

Please sign in to comment.