Skip to content

Commit

Permalink
squash
Browse files Browse the repository at this point in the history
  • Loading branch information
dr7ana committed May 9, 2024
1 parent eee0f17 commit ae5a803
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 211 deletions.
6 changes: 3 additions & 3 deletions include/oxen/quic/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ namespace oxen::quic
template <oxenc::basic_char Char>
void send_datagram(std::vector<Char>&& buf)
{
send_datagram(
std::basic_string_view<Char>{buf.data(), buf.size()},
std::make_shared<std::vector<Char>>(std::move(buf)));
auto keep_alive = std::make_shared<std::vector<Char>>(std::move(buf));
std::basic_string_view<Char> view{keep_alive->data(), keep_alive->size()};
send_datagram(view, std::move(keep_alive));
}

template <oxenc::basic_char CharType>
Expand Down
2 changes: 1 addition & 1 deletion src/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ namespace oxen::quic
{
return endpoint.call_get([this]() { return _is_writing; });
}

std::shared_ptr<connection_interface> Stream::get_conn_interface()
{
return std::static_pointer_cast<connection_interface>(_conn->shared_from_this());
Expand Down
1 change: 1 addition & 0 deletions tests/001-handshake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <oxen/quic/gnutls_crypto.hpp>
#include <thread>

#include "tcp.hpp"
#include "utils.hpp"

namespace oxen::quic::test
Expand Down
213 changes: 136 additions & 77 deletions tests/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,35 @@ namespace oxen::quic
class TCPHandle;

inline const auto LOCALHOST = "127.0.0.1"s;
inline const auto TUNNEL_SEED = oxenc::from_hex("0000000000000000000000000000000000000000000000000000000000000000");
inline const auto TUNNEL_PUBKEY = oxenc::from_hex("3b6a27bcceb6a42d62a3a8d02a6f0d73653215771de243a63ac048a18b59da29");
inline constexpr auto TUNNEL_SEED = "0000000000000000000000000000000000000000000000000000000000000000"_hex;
inline constexpr auto TUNNEL_PUBKEY = "3b6a27bcceb6a42d62a3a8d02a6f0d73653215771de243a63ac048a18b59da29"_hex;

inline constexpr size_t HIGH_WATERMARK{4_Mi};
inline constexpr size_t LOW_WATERMARK{HIGH_WATERMARK / 4};

inline std::vector<std::byte> serialize_payload(bstring_view data, uint16_t port = 0)
{
std::vector<std::byte> ret(data.size() + sizeof(port));
oxenc::write_host_as_big(port, ret.data());
std::memcpy(&ret[2], data.data(), data.size());
return ret;
}

inline std::tuple<uint16_t, bstring> deserialize_payload(bstring data)
{
uint16_t p = oxenc::load_big_to_host<uint16_t>(data.data());

return {p, data.substr(2)};
}

struct TCPQUIC
{
std::shared_ptr<connection_interface> _ci;

std::unordered_set<std::shared_ptr<TCPConnection>> t;

// keyed against backend tcp address
std::unordered_map<Address, std::unordered_set<std::shared_ptr<TCPConnection>>> _tcp_conns2;
std::unordered_map<Address, std::shared_ptr<TCPConnection>> _tcp_conns;
};

Expand All @@ -58,17 +79,62 @@ namespace oxen::quic
evconnlistener_free(e);
};

void tcp_drained_write_cb(struct bufferevent* bev, void* user_arg);

void tcp_read_cb(struct bufferevent* bev, void* user_arg);

void tcp_event_cb(struct bufferevent* bev, short what, void* user_arg);

void tcp_listen_cb(
struct evconnlistener* listener, evutil_socket_t fd, struct sockaddr* src, int socklen, void* user_arg);

void tcp_err_cb(struct evconnlistener* listener, void* user_arg);

struct TCPConnection
{
TCPConnection(struct bufferevent* _bev, evutil_socket_t _fd, std::shared_ptr<Stream> _s) :
bev{_bev}, fd{_fd}, stream{std::move(_s)}
{}
{
stream->set_stream_data_cb([this](oxen::quic::Stream& s, bstring_view data) {
auto rv = bev ? bufferevent_write(bev, data.data(), data.size()) : -1;
log::info(
test_cat,
"Stream (id: {}) {} {}B to TCP buffer",
s.stream_id(),
rv < 0 ? "failed to write" : "successfully wrote",
data.size());

// we get the output buffer (it sounds backwards but it isn't)
if (evbuffer_get_length(bufferevent_get_output(bev)) >= HIGH_WATERMARK)
{
log::info(
test_cat, "TCP input buffer over high-water threshold ({}); pausing stream...", HIGH_WATERMARK);
s.pause();

bufferevent_setcb(bev, tcp_read_cb, tcp_drained_write_cb, tcp_event_cb, this);
bufferevent_setwatermark(bev, EV_WRITE, LOW_WATERMARK, HIGH_WATERMARK);
}
});

stream->set_stream_close_cb([this](Stream&, uint64_t) {
log::info(
test_cat,
"Stream closed cb fired, {}...",
bev ? "freeing bufferevent" : "bufferevent already freed");
if (bev)
bufferevent_free(bev);
});

stream->set_remote_reset_hooks(opt::remote_stream_reset{
[](Stream& s, uint64_t) {
log::info(test_cat, "Remote stream signalled reading termination; halting local stream write!");
s.stop_writing();
},
[](Stream& s, uint64_t) {
log::info(test_cat, "Remote stream signalled writing termination; halting local stream read!");
s.stop_reading();
}});
}

TCPConnection() = delete;

Expand Down Expand Up @@ -163,37 +229,18 @@ namespace oxen::quic
// returns the socket address of the TCP connection
std::optional<Address> connect() const { return _connect; }

std::shared_ptr<TCPConnection> connect_to_backend(std::shared_ptr<Stream> s, Address addr)
std::shared_ptr<TCPConnection> connect_to_backend(std::shared_ptr<Stream> stream, Address addr)
{
if (addr.port() == 0)
throw std::runtime_error{"TCP backend must have valid port on localhost!"};

log::critical(test_cat, "Attempting TCP connection to backend at: {}", addr);
log::info(test_cat, "Attempting TCP connection to backend at: {}", addr);
sockaddr_in _addr = addr.in4();

struct bufferevent* _bev =
bufferevent_socket_new(_ev->loop().get(), -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);

s->set_stream_data_cb([_bev](oxen::quic::Stream& s, bstring_view data) {
auto rv = _bev ? bufferevent_write(_bev, data.data(), data.size()) : -1;
log::info(
test_cat,
"Stream (id: {}) {} {}B to TCP buffer",
s.stream_id(),
rv < 0 ? "failed to write" : "successfully wrote",
data.size());
});

s->set_stream_close_cb([_bev](Stream&, uint64_t) {
log::critical(
test_cat,
"Stream closed cb fired, {}...",
_bev ? "freeing bufferevent" : "bufferevent already freed");
if (_bev)
bufferevent_free(_bev);
});

auto tcp_conn = std::make_shared<TCPConnection>(_bev, -1, std::move(s));
auto tcp_conn = std::make_shared<TCPConnection>(_bev, -1, std::move(stream));

bufferevent_setcb(_bev, tcp_read_cb, nullptr, tcp_event_cb, tcp_conn.get());
bufferevent_enable(_bev, EV_READ | EV_WRITE);
Expand Down Expand Up @@ -260,13 +307,25 @@ namespace oxen::quic

evconnlistener_set_error_cb(_tcp_listener.get(), tcp_err_cb);

log::critical(test_cat, "TCPHandle set up listener on: {}", *_bound);
log::info(test_cat, "TCPHandle set up listener on: {}", *_bound);
}
};

inline void tcp_drained_write_cb(struct bufferevent* bev, void* user_arg)
{
bufferevent_setcb(bev, tcp_read_cb, nullptr, tcp_event_cb, user_arg);
bufferevent_setwatermark(bev, EV_WRITE, 0, 0);

auto* conn = reinterpret_cast<TCPConnection*>(user_arg);
assert(conn);

log::info(test_cat, "TCP input buffer below low-water threshold ({}); resuming stream!", LOW_WATERMARK);
conn->stream->resume();
}

inline void tcp_read_cb(struct bufferevent* bev, void* user_arg)
{
std::array<uint8_t, 2048> buf{};
std::array<uint8_t, 4096> buf{};

// Load data from input buffer to local buffer
auto nwrite = bufferevent_read(bev, buf.data(), buf.size());
Expand All @@ -277,92 +336,92 @@ namespace oxen::quic
{
auto* conn = reinterpret_cast<TCPConnection*>(user_arg);
assert(conn);
auto& stream = conn->stream;
assert(stream);

stream->send(ustring{buf.data(), nwrite});

conn->stream->send(ustring{(buf.data()), nwrite});
if (stream->unsent() >= HIGH_WATERMARK)
{
stream->set_watermark(
LOW_WATERMARK,
HIGH_WATERMARK,
opt::watermark{
[bev](Stream&) {
log::info(test_cat, "Stream buffer below low-water threshold; enabling TCP read!");
bufferevent_enable(bev, EV_READ);
},
false},
opt::watermark{
[bev](Stream&) {
log::info(test_cat, "Stream buffer above high-water threshold; disabling TCP read!");
bufferevent_disable(bev, EV_READ);
},
false});
}
}
}

inline void tcp_event_cb([[maybe_unused]] struct bufferevent* bev, short what, void* user_arg)
{
// this is where the InboundSession confirms it established a TCP connection to the backend app
if (what & BEV_EVENT_CONNECTED)
{
log::info(test_cat, "TCP connect operation succeeded!");
}
if (what & BEV_EVENT_EOF)
{
log::critical(test_cat, "TCP Connection EOF!");
}
if (what & BEV_EVENT_ERROR)
{
log::critical(
test_cat,
"TCP Connection encountered bufferevent error (msg: {})!",
evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
}
if (what & (BEV_EVENT_ERROR | BEV_EVENT_EOF))
{
// if (bev)
// {
// log::critical(test_cat, "Freeing bufferevent socket...");
// bufferevent_free(bev);
// }

auto* conn = reinterpret_cast<TCPConnection*>(user_arg);
assert(conn);
auto* conn = reinterpret_cast<TCPConnection*>(user_arg);
assert(conn);
auto& stream = conn->stream;

if (what & BEV_EVENT_EOF)
{
if (what & BEV_EVENT_WRITING)
{
// remote shut down reading
log::info(test_cat, "Remote TCP stopped reading! Halting stream write...");
stream->stop_writing();
}
else if (what & BEV_EVENT_READING)
{
// remote shut down writing
log::info(test_cat, "Error encountered while reading! Halting stream read...");
stream->stop_reading();
}
else
{
// remote closed connection
log::info(test_cat, "TCP Connection EOF!");
}
}
if (what & (BEV_EVENT_ERROR | BEV_EVENT_EOF) and not(what & BEV_EVENT_READING) and not(what & BEV_EVENT_WRITING))
{
log::critical(test_cat, "Closing stream...");
conn->stream->close();
// auto& str = conn->stream;
// if (str and not str->is_closing())
// {
// return str->close();
// }
// log::critical(test_cat, "Stream for tcp connection already destroyed...");
stream->close();
}
}

inline void tcp_listen_cb(
struct evconnlistener* listener, evutil_socket_t fd, struct sockaddr* src, int socklen, void* user_arg)
{
oxen::quic::Address source{src, static_cast<socklen_t>(socklen)};
log::critical(test_cat, "TCP CONNECTION ESTABLISHED -- SRC: {}", source);
log::info(test_cat, "TCP CONNECTION ESTABLISHED -- SRC: {}", source);

auto* b = evconnlistener_get_base(listener);
auto* _bev = bufferevent_socket_new(b, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);

// int yes{1};
// if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(int)) < 0)
// {
// log::critical(
// test_cat,
// "Failed to set keepalive on accepted TCP connection socket: {}",
// evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
// return bufferevent_free(bevent);
// }

auto* handle = reinterpret_cast<TCPHandle*>(user_arg);
assert(handle);

// make TCPConnection here!
auto* conn = handle->_conn_maker(_bev, fd, std::move(source));

conn->stream->set_stream_data_cb([_bev](Stream& s, bstring_view data) {
auto rv = _bev ? bufferevent_write(_bev, data.data(), data.size()) : -1;
log::info(
test_cat,
"Stream (id: {}) {} {}B to TCP buffer",
s.stream_id(),
rv < 0 ? "failed to write" : "successfully wrote",
data.size());
});

conn->stream->set_stream_close_cb([_bev](Stream&, uint64_t) {
log::critical(
test_cat, "Stream closed cb fired, {}...", _bev ? "freeing bufferevent" : "bufferevent already freed");
if (_bev)
bufferevent_free(_bev);
});
auto stream = conn->stream;

bufferevent_setcb(_bev, tcp_read_cb, nullptr, tcp_event_cb, conn);
bufferevent_enable(_bev, EV_READ | EV_WRITE);
Expand Down
Loading

0 comments on commit ae5a803

Please sign in to comment.