Skip to content

Commit

Permalink
squashing working commits
Browse files Browse the repository at this point in the history
  • Loading branch information
dr7ana committed May 14, 2024
1 parent 5786cf8 commit bebb042
Show file tree
Hide file tree
Showing 17 changed files with 974 additions and 326 deletions.
2 changes: 1 addition & 1 deletion include/oxen/quic/address.hpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#pragma once

#include <oxenc/bt.h>
#include <oxenc/endian.h>

#include <compare>
#include <oxenc/bt.h>

#include "formattable.hpp"
#include "ip.hpp"
Expand Down
7 changes: 4 additions & 3 deletions include/oxen/quic/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ namespace oxen::quic
template <oxenc::basic_char Char>
void send_datagram(std::vector<Char>&& buf)
{
send_datagram(
std::basic_string_view<Char>{buf.data(), buf.size()},
std::make_shared<std::vector<Char>>(std::move(buf)));
auto keep_alive = std::make_shared<std::vector<Char>>(std::move(buf));
std::basic_string_view<Char> view{keep_alive->data(), keep_alive->size()};
send_datagram(view, std::move(keep_alive));
}

template <oxenc::basic_char CharType>
Expand Down Expand Up @@ -490,6 +490,7 @@ namespace oxen::quic
int stream_ack(int64_t id, size_t size);
int stream_receive(int64_t id, bstring_view data, bool fin);
void stream_execute_close(Stream& s, uint64_t app_code);
void stream_stop_sending(int64_t id, uint64_t app_code);
void stream_reset(int64_t id, uint64_t app_code);
void stream_closed(int64_t id, uint64_t app_code);
void close_all_streams();
Expand Down
26 changes: 13 additions & 13 deletions include/oxen/quic/opt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,32 +210,32 @@ namespace oxen::quic
using on_reset_hook_t = std::function<void(Stream&, uint64_t)>;

private:
on_reset_hook_t _on_read_reset = nullptr;
on_reset_hook_t _on_write_reset = nullptr;
on_reset_hook_t _on_stop_sending = nullptr;
on_reset_hook_t _on_stream_reset = nullptr;

public:
remote_stream_reset() = default;

explicit remote_stream_reset(on_reset_hook_t _on_read, on_reset_hook_t _on_write = nullptr) :
_on_read_reset{std::move(_on_read)}, _on_write_reset{std::move(_on_write)}
explicit remote_stream_reset(on_reset_hook_t _stop_sending, on_reset_hook_t _stream_reset = nullptr) :
_on_stop_sending{std::move(_stop_sending)}, _on_stream_reset{std::move(_stream_reset)}
{
if (not _on_read_reset and not _on_write_reset)
throw std::invalid_argument{"Must set at least one of `on_read_reset` and `on_write_reset`!"};
if (not _on_stop_sending and not _on_stream_reset)
throw std::invalid_argument{"Must set at least one of `on_stop_sending` and `on_stream_reset`!"};
}

explicit operator bool() const { return has_read_hook() and has_write_hook(); }
explicit operator bool() const { return has_stop_sending_hook() and has_stream_reset_hook(); }

void clear()
{
_on_read_reset = nullptr;
_on_write_reset = nullptr;
_on_stop_sending = nullptr;
_on_stream_reset = nullptr;
}

bool has_read_hook() const { return _on_read_reset != nullptr; }
bool has_write_hook() const { return _on_write_reset != nullptr; }
bool has_stop_sending_hook() const { return _on_stop_sending != nullptr; }
bool has_stream_reset_hook() const { return _on_stream_reset != nullptr; }

void read_reset(Stream& s, uint64_t ec) { return _on_read_reset(s, ec); }
void write_reset(Stream& s, uint64_t ec) { return _on_write_reset(s, ec); }
void on_remote_stop_sending(Stream& s, uint64_t ec) { return _on_stop_sending(s, ec); }
void on_remote_stream_reset(Stream& s, uint64_t ec) { return _on_stream_reset(s, ec); }
};

} // namespace opt
Expand Down
7 changes: 6 additions & 1 deletion include/oxen/quic/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ namespace oxen::quic

void stop_writing();

void stop_sending(uint64_t code = STREAM_REMOTE_READ_SHUTDOWN);

void reset_stream(uint64_t code = STREAM_REMOTE_WRITE_SHUTDOWN);

bool is_reading() const;

bool is_writing() const;
Expand Down Expand Up @@ -169,7 +173,7 @@ namespace oxen::quic
bool _paused{false};
int64_t _stream_id;

size_t _paused_offset{0};
std::atomic<size_t> _paused_offset{0};

bool _is_watermarked{false};

Expand All @@ -188,6 +192,7 @@ namespace oxen::quic

bool _is_reading{true};
bool _is_writing{true};
uint64_t _deferred_ec{0};

void wrote(size_t bytes) override;

Expand Down
83 changes: 52 additions & 31 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ namespace oxen::quic
return 0;
}

static int on_stream_stop_sending(
ngtcp2_conn* /* conn */,
int64_t stream_id,
uint64_t app_error_code,
void* user_data,
void* /* stream_user_data */)
{
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
static_cast<Connection*>(user_data)->stream_stop_sending(stream_id, app_error_code);
return 0;
}

static int on_stream_reset(
ngtcp2_conn* /*conn*/,
int64_t stream_id,
Expand Down Expand Up @@ -934,7 +946,7 @@ namespace oxen::quic
}
else if (bufs.empty())
{
log::debug(log_cat, "pending() returned empty buffer for stream ID {}, moving on", stream_id);
log::trace(log_cat, "pending() returned empty buffer for stream ID {}, moving on", stream_id);
continue;
}
}
Expand Down Expand Up @@ -971,7 +983,7 @@ namespace oxen::quic
dgram.size(),
ts);

log::debug(log_cat, "ngtcp2_conn_writev_datagram returned a value of {}", nwrite);
log::trace(log_cat, "ngtcp2_conn_writev_datagram returned a value of {}", nwrite);

if (datagram_accepted != 0)
{
Expand Down Expand Up @@ -1082,7 +1094,7 @@ namespace oxen::quic
log::trace(log_cat, "Sending final packet batch of {} packets", n_packets);
send(&pkt_updater);
}
log::debug(log_cat, "Exiting flush_streams()");
log::trace(log_cat, "Exiting flush_streams()");
}

void Connection::schedule_packet_retransmit(std::chrono::steady_clock::time_point ts)
Expand Down Expand Up @@ -1160,6 +1172,7 @@ namespace oxen::quic
{
const bool was_closing = stream._is_closing;
stream._is_closing = stream._is_shutdown = true;
stream._is_reading = stream._is_writing = false;

if (stream._is_watermarked)
stream.clear_watermarks();
Expand All @@ -1171,7 +1184,7 @@ namespace oxen::quic
}
}

void Connection::stream_reset(int64_t id, uint64_t app_code)
void Connection::stream_stop_sending(int64_t id, uint64_t app_code)
{
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
assert(ngtcp2_is_bidi_stream(id));
Expand All @@ -1182,37 +1195,40 @@ namespace oxen::quic

auto& stream = it->second;

switch (app_code)
if (stream->_remote_reset.has_stop_sending_hook())
{
case STREAM_REMOTE_READ_SHUTDOWN:
log::debug(log_cat, "Stream (ID:{}) received remote read shutdown signal!", id);

if (stream->_remote_reset.has_read_hook())
{
log::debug(log_cat, "Invoking remote_read_reset hook...");
stream->_in_reset = true;
stream->_remote_reset.read_reset(*stream.get(), app_code);
stream->_in_reset = false;
}

break;
log::info(log_cat, "Invoking on_remote_stop_sending hook...");
stream->_in_reset = true;
stream->_remote_reset.on_remote_stop_sending(*stream.get(), app_code);
stream->_in_reset = false;
}
else
{
log::info(log_cat, "No user-provided on_remote_stop_sending hook", app_code);
}
}

case STREAM_REMOTE_WRITE_SHUTDOWN:
log::debug(log_cat, "Stream (ID:{}) received remote write shutdown signal!", id);
void Connection::stream_reset(int64_t id, uint64_t app_code)
{
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
assert(ngtcp2_is_bidi_stream(id));
auto it = _streams.find(id);

if (stream->_remote_reset.has_write_hook())
{
log::debug(log_cat, "Invoking remote_write_reset hook...");
stream->_in_reset = true;
stream->_remote_reset.write_reset(*stream.get(), app_code);
stream->_in_reset = false;
}
if (it == _streams.end())
return;

break;
auto& stream = it->second;

default:
log::critical(
log_cat, "Stream (ID:{}) received unrecognized app code (ec:{}) for stream reset!", id, app_code);
if (stream->_remote_reset.has_stream_reset_hook())
{
log::info(log_cat, "Invoking on_remote_stream_reset hook...");
stream->_in_reset = true;
stream->_remote_reset.on_remote_stream_reset(*stream.get(), app_code);
stream->_in_reset = false;
}
else
{
log::info(log_cat, "No user-provided on_remote_stream_reset hook", app_code);
}
}

Expand Down Expand Up @@ -1350,11 +1366,15 @@ namespace oxen::quic
}
else
{
size_t sz = 0;

if (str->_paused)
str->_paused_offset += data.size();
else
ngtcp2_conn_extend_max_stream_offset(conn.get(), id, data.size());
sz = data.size();

ngtcp2_conn_extend_max_offset(conn.get(), data.size());
ngtcp2_conn_extend_max_stream_offset(conn.get(), id, sz);
}

return 0;
Expand Down Expand Up @@ -1519,6 +1539,7 @@ namespace oxen::quic
callbacks.dcid_status = Callbacks::on_connection_id_status;
callbacks.update_key = ngtcp2_crypto_update_key_cb;
callbacks.stream_reset = Callbacks::on_stream_reset;
callbacks.stream_stop_sending = Callbacks::on_stream_stop_sending;
callbacks.delete_crypto_aead_ctx = ngtcp2_crypto_delete_crypto_aead_ctx_cb;
callbacks.delete_crypto_cipher_ctx = ngtcp2_crypto_delete_crypto_cipher_ctx_cb;
callbacks.get_path_challenge_data = ngtcp2_crypto_get_path_challenge_data_cb;
Expand Down
2 changes: 1 addition & 1 deletion src/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ namespace oxen::quic
}
}
else
log::debug(log_cat, "Found associated connection to incoming DCID!");
log::trace(log_cat, "Found associated connection to incoming DCID!");

if (cptr->is_outbound())
// For a inbound packet on an outbound connection the packet handling code will have set
Expand Down
2 changes: 1 addition & 1 deletion src/messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ namespace oxen::quic
}
else
{
log::debug(log_cat, "enable_datagram_drop_test is false, skipping optional logic");
log::trace(log_cat, "enable_datagram_drop_test is false, skipping optional logic");
}

log::trace(
Expand Down
Loading

0 comments on commit bebb042

Please sign in to comment.