Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Oct 19, 2023
1 parent 81178d0 commit 7a36e05
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 49 deletions.
8 changes: 8 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ enum class type : uint8_t
http_callback,
ipc,
tcp,
channel,
socket,
socket_write,
confirmation_height,
confirmation_observer,
drop,
Expand Down Expand Up @@ -292,6 +295,11 @@ enum class detail : uint8_t
deprioritize,
deprioritize_failed,

// socket
error_critical,
error_transient,
success,

_last // Must be the last enum
};

Expand Down
175 changes: 128 additions & 47 deletions nano/node/transport/socket.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <nano/boost/asio/bind_executor.hpp>
#include <nano/boost/asio/ip/address_v6.hpp>
#include <nano/boost/asio/read.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/socket.hpp>
#include <nano/node/transport/transport.hpp>
Expand Down Expand Up @@ -42,6 +43,7 @@ nano::transport::socket::socket (nano::node & node_a, endpoint_type_t endpoint_t
strand{ node_a.io_ctx.get_executor () },
tcp_socket{ node_a.io_ctx },
node{ node_a },
write_timer{ node_a.io_ctx },
endpoint_type_m{ endpoint_type_a },
timeout{ std::numeric_limits<uint64_t>::max () },
last_completion_time_or_init{ nano::seconds_since_epoch () },
Expand All @@ -60,6 +62,7 @@ nano::transport::socket::~socket ()
void nano::transport::socket::start ()
{
ongoing_checkup ();
ongoing_write ();
}

void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function<void (boost::system::error_code const &)> callback_a)
Expand Down Expand Up @@ -127,83 +130,147 @@ void nano::transport::socket::async_read (std::shared_ptr<std::vector<uint8_t>>
}
}

void nano::transport::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a, nano::transport::traffic_type traffic_type)
void nano::transport::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> callback, nano::transport::traffic_type traffic_type)
{
// Existing code holds weak ptrs to a socket and assumes that as long as async ops are in progress, the socket will be kept alive
// Thus we need to capture a shared ptr to this socket in the callback
callback = [this_s = shared_from_this (), callback = std::move (callback)] (boost::system::error_code const & ec, std::size_t size) {
if (callback)
{
callback (ec, size);
}
};

if (closed)
{
if (callback_a)
if (callback)
{
node.background ([callback = std::move (callback_a)] () {
node.background ([callback = std::move (callback)] () {
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
return;
}

bool queued = send_queue.insert (buffer_a, callback_a, traffic_type);
bool queued = send_queue.insert (buffer_a, callback, traffic_type);
if (!queued)
{
if (callback_a)
if (callback)
{
node.background ([callback = std::move (callback_a)] () {
node.background ([callback = std::move (callback)] () {
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
return;
}

boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this (), buffer_a, callback_a, traffic_type] () {
if (!this_s->write_in_progress)
{
this_s->write_queued_messages ();
}
}));
else
{
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this ()] () {
if (std::chrono::steady_clock::now () >= this_s->write_cooldown)
{
this_s->write_timer.cancel (); // Signal that new data is present to be sent
}
}));
}
}

// Must be called from strand
void nano::transport::socket::write_queued_messages ()
void nano::transport::socket::ongoing_write ()
{
if (closed)
{
return;
}

auto next = send_queue.pop ();
if (!next)
{
return;
}
std::weak_ptr<nano::transport::socket> this_w (shared_from_this ());
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_w] () {
if (auto this_l = this_w.lock ())
{
if (std::chrono::steady_clock::now () < this_l->write_cooldown)
{
this_l->write_timer.expires_at (this_l->write_cooldown);
this_l->write_timer.async_wait ([this_w] (boost::system::error_code const & ec) {
if (auto this_l = this_w.lock ())
{
this_l->ongoing_write ();
}
});
return;
}

set_default_timeout ();
auto next = this_l->send_queue.peek ();
if (!next)
{
this_l->write_timer.expires_after (std::chrono::seconds{ 1 });
this_l->write_timer.async_wait ([this_w] (boost::system::error_code const & ec) {
if (auto this_l = this_w.lock ())
{
this_l->ongoing_write ();
}
});
return;
}

write_in_progress = true;
nano::async_write (tcp_socket, next->buffer,
boost::asio::bind_executor (strand, [this_s = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) {
this_s->write_in_progress = false;
debug_assert (!this_l->write_in_progress);
this_l->write_in_progress = true;

if (ec)
{
this_s->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
this_s->close ();
}
else
{
this_s->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size);
this_s->set_last_completion ();
}
this_l->set_default_timeout ();
nano::async_write (this_l->tcp_socket, next->buffer,
boost::asio::bind_executor (this_l->strand, [this_l, callback = std::move (next->callback)] (boost::system::error_code ec, std::size_t size) {
debug_assert (this_l->write_in_progress);
this_l->write_in_progress = false;

if (next->callback)
{
next->callback (ec, size);
}
if (ec)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);

if (!ec)
{
this_s->write_queued_messages ();
if (is_transient_error (ec))
{
this_l->node.stats.inc (nano::stat::type::socket_write, nano::stat::detail::error_transient);

this_l->write_cooldown = std::chrono::steady_clock::now () + write_throttling_delay;
}
else
{
this_l->node.stats.inc (nano::stat::type::socket_write, nano::stat::detail::error_critical);

this_l->close ();

if (callback)
{
callback (ec, size);
}
}
}
else
{
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size);
this_l->node.stats.inc (nano::stat::type::socket_write, nano::stat::detail::success);

this_l->set_last_completion ();
this_l->send_queue.pop ();

if (callback)
{
callback (ec, size);
}
}

this_l->ongoing_write ();
}));
}
}));
}

// These errors are not critical and should not cause the socket to be disconnected
bool nano::transport::socket::is_transient_error (const boost::system::error_code & ec)
{
debug_assert (ec.category () == boost::asio::error::system_category);
if (ec == boost::asio::error::no_buffer_space)
{
return true;
}
return false;
}

bool nano::transport::socket::max (nano::transport::traffic_type traffic_type) const
{
return send_queue.size (traffic_type) >= max_queue_size;
Expand Down Expand Up @@ -339,8 +406,6 @@ void nano::transport::socket::close_internal ()
return;
}

send_queue.clear ();

default_timeout = std::chrono::seconds (0);
boost::system::error_code ec;

Expand All @@ -353,6 +418,9 @@ void nano::transport::socket::close_internal ()
node.logger.try_log ("Failed to close socket gracefully: ", ec.message ());
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::error_socket_close);
}

send_queue.clear ();
write_timer.cancel ();
}

nano::tcp_endpoint nano::transport::socket::remote_endpoint () const
Expand Down Expand Up @@ -385,7 +453,7 @@ bool nano::transport::socket::write_queue::insert (const buffer_t & buffer, call
return false; // Not queued
}

std::optional<nano::transport::socket::write_queue::entry> nano::transport::socket::write_queue::pop ()
std::optional<nano::transport::socket::write_queue::entry> nano::transport::socket::write_queue::peek ()
{
nano::lock_guard<nano::mutex> guard{ mutex };

Expand All @@ -394,7 +462,7 @@ std::optional<nano::transport::socket::write_queue::entry> nano::transport::sock
if (!que.empty ())
{
auto item = que.front ();
que.pop ();
last_queue = type;
return item;
}
return std::nullopt;
Expand All @@ -413,6 +481,19 @@ std::optional<nano::transport::socket::write_queue::entry> nano::transport::sock
return std::nullopt;
}

void nano::transport::socket::write_queue::pop ()
{
nano::lock_guard<nano::mutex> guard{ mutex };

// Might get emptied if socket is closed while sending
if (auto it = queues.find (last_queue); it != queues.end ())
{
auto & que = it->second;
release_assert (!que.empty ());
que.pop ();
}
}

void nano::transport::socket::write_queue::clear ()
{
nano::lock_guard<nano::mutex> guard{ mutex };
Expand Down
15 changes: 13 additions & 2 deletions nano/node/transport/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>

public:
static std::size_t constexpr default_max_queue_size = 128;
static std::chrono::milliseconds constexpr write_throttling_delay{ 50 };

enum class type_t
{
Expand Down Expand Up @@ -138,7 +139,8 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>
explicit write_queue (std::size_t max_size);

bool insert (buffer_t const &, callback_t, nano::transport::traffic_type);
std::optional<entry> pop ();
std::optional<entry> peek ();
void pop ();
void clear ();
std::size_t size (nano::transport::traffic_type) const;
bool empty () const;
Expand All @@ -148,6 +150,7 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>
private:
mutable nano::mutex mutex;
std::unordered_map<nano::transport::traffic_type, std::queue<entry>> queues;
nano::transport::traffic_type last_queue;
};

write_queue send_queue;
Expand All @@ -157,6 +160,9 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>
boost::asio::ip::tcp::socket tcp_socket;
nano::node & node;

/** We use `steady_timer` as an asynchronous condition variable */
boost::asio::steady_timer write_timer;

/** The other end of the connection */
boost::asio::ip::tcp::endpoint remote;

Expand Down Expand Up @@ -193,14 +199,19 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>
/** Updated only from strand, but stored as atomic so it can be read from outside */
std::atomic<bool> write_in_progress{ false };

std::chrono::steady_clock::time_point write_cooldown;

private:
void close_internal ();
void write_queued_messages ();
void set_default_timeout ();
void set_last_completion ();
void set_last_receive_time ();
void ongoing_checkup ();
void ongoing_write ();
void read_impl (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a);

static bool is_transient_error (boost::system::error_code const &);

private:
type_t type_m{ type_t::undefined };
endpoint_type_t endpoint_type_m;
Expand Down

0 comments on commit 7a36e05

Please sign in to comment.