Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Oct 19, 2023
1 parent 81178d0 commit 63c859c
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 43 deletions.
8 changes: 8 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ enum class type : uint8_t
http_callback,
ipc,
tcp,
channel,
socket,
socket_write,
confirmation_height,
confirmation_observer,
drop,
Expand Down Expand Up @@ -292,6 +295,11 @@ enum class detail : uint8_t
deprioritize,
deprioritize_failed,

// socket
error_critical,
error_transient,
success,

_last // Must be the last enum
};

Expand Down
141 changes: 100 additions & 41 deletions nano/node/transport/socket.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <nano/boost/asio/bind_executor.hpp>
#include <nano/boost/asio/ip/address_v6.hpp>
#include <nano/boost/asio/read.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/socket.hpp>
#include <nano/node/transport/transport.hpp>
Expand Down Expand Up @@ -42,6 +43,7 @@ nano::transport::socket::socket (nano::node & node_a, endpoint_type_t endpoint_t
strand{ node_a.io_ctx.get_executor () },
tcp_socket{ node_a.io_ctx },
node{ node_a },
write_timer{ node_a.io_ctx },
endpoint_type_m{ endpoint_type_a },
timeout{ std::numeric_limits<uint64_t>::max () },
last_completion_time_or_init{ nano::seconds_since_epoch () },
Expand All @@ -60,6 +62,7 @@ nano::transport::socket::~socket ()
void nano::transport::socket::start ()
{
ongoing_checkup ();
ongoing_write ();
}

void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function<void (boost::system::error_code const &)> callback_a)
Expand Down Expand Up @@ -149,61 +152,105 @@ void nano::transport::socket::async_write (nano::shared_const_buffer const & buf
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
return;
}

boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this (), buffer_a, callback_a, traffic_type] () {
if (!this_s->write_in_progress)
{
this_s->write_queued_messages ();
}
}));
else
{
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this ()] () {
if (std::chrono::steady_clock::now () >= this_s->write_cooldown)
{
this_s->write_timer.cancel (); // Signal that new data is present to be sent
}
}));
}
}

// Must be called from strand
void nano::transport::socket::write_queued_messages ()
void nano::transport::socket::ongoing_write ()
{
if (closed)
{
return;
}

auto next = send_queue.pop ();
if (!next)
{
return;
}

set_default_timeout ();

write_in_progress = true;
nano::async_write (tcp_socket, next->buffer,
boost::asio::bind_executor (strand, [this_s = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) {
this_s->write_in_progress = false;

if (ec)
{
this_s->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
this_s->close ();
}
else
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this ()] () {
if (std::chrono::steady_clock::now () < this_s->write_cooldown)
{
this_s->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size);
this_s->set_last_completion ();
this_s->write_timer.expires_at (this_s->write_cooldown);
this_s->write_timer.async_wait ([this_s] (boost::system::error_code const & ec) {
this_s->ongoing_write ();
});
return;
}

if (next->callback)
auto next = this_s->send_queue.peek ();
if (!next)
{
next->callback (ec, size);
this_s->write_timer.expires_after (std::chrono::seconds{ 1 });
this_s->write_timer.async_wait ([this_s] (boost::system::error_code const & ec) {
this_s->ongoing_write ();
});
return;
}

if (!ec)
{
this_s->write_queued_messages ();
}
debug_assert (!this_s->write_in_progress);
this_s->write_in_progress = true;

this_s->set_default_timeout ();
nano::async_write (this_s->tcp_socket, next->buffer,
boost::asio::bind_executor (this_s->strand, [this_s, callback = std::move (next->callback)] (boost::system::error_code ec, std::size_t size) {
debug_assert (this_s->write_in_progress);
this_s->write_in_progress = false;

if (ec)
{
this_s->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);

if (is_transient_error (ec))
{
this_s->node.stats.inc (nano::stat::type::socket_write, nano::stat::detail::error_transient);

this_s->write_cooldown = std::chrono::steady_clock::now () + write_throttling_delay;
}
else
{
this_s->node.stats.inc (nano::stat::type::socket_write, nano::stat::detail::error_critical);

this_s->close ();

if (callback)
{
callback (ec, size);
}
}
}
else
{
this_s->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size);
this_s->node.stats.inc (nano::stat::type::socket_write, nano::stat::detail::success);

this_s->set_last_completion ();
this_s->send_queue.pop ();

if (callback)
{
callback (ec, size);
}
}

this_s->ongoing_write ();
}));
}));
}

// These errors are not critical and should not cause the socket to be disconnected
bool nano::transport::socket::is_transient_error (const boost::system::error_code & ec)
{
if (ec == boost::asio::error::no_buffer_space)
{
return true;
}
return false;
}

bool nano::transport::socket::max (nano::transport::traffic_type traffic_type) const
{
return send_queue.size (traffic_type) >= max_queue_size;
Expand Down Expand Up @@ -339,15 +386,15 @@ void nano::transport::socket::close_internal ()
return;
}

send_queue.clear ();

default_timeout = std::chrono::seconds (0);
boost::system::error_code ec;

// Ignore error code for shutdown as it is best-effort
tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
tcp_socket.close (ec);

write_timer.cancel ();

if (ec)
{
node.logger.try_log ("Failed to close socket gracefully: ", ec.message ());
Expand Down Expand Up @@ -385,7 +432,7 @@ bool nano::transport::socket::write_queue::insert (const buffer_t & buffer, call
return false; // Not queued
}

std::optional<nano::transport::socket::write_queue::entry> nano::transport::socket::write_queue::pop ()
std::optional<nano::transport::socket::write_queue::entry> nano::transport::socket::write_queue::peek ()
{
nano::lock_guard<nano::mutex> guard{ mutex };

Expand All @@ -394,7 +441,7 @@ std::optional<nano::transport::socket::write_queue::entry> nano::transport::sock
if (!que.empty ())
{
auto item = que.front ();
que.pop ();
last_queue = type;
return item;
}
return std::nullopt;
Expand All @@ -413,6 +460,18 @@ std::optional<nano::transport::socket::write_queue::entry> nano::transport::sock
return std::nullopt;
}

void nano::transport::socket::write_queue::pop ()
{
nano::lock_guard<nano::mutex> guard{ mutex };

auto it = queues.find (last_queue);
release_assert (it != queues.end ());

auto & que = it->second;
release_assert (!que.empty ());
que.pop ();
}

void nano::transport::socket::write_queue::clear ()
{
nano::lock_guard<nano::mutex> guard{ mutex };
Expand Down
15 changes: 13 additions & 2 deletions nano/node/transport/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>

public:
static std::size_t constexpr default_max_queue_size = 128;
static std::chrono::milliseconds constexpr write_throttling_delay{ 50 };

enum class type_t
{
Expand Down Expand Up @@ -138,7 +139,8 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>
explicit write_queue (std::size_t max_size);

bool insert (buffer_t const &, callback_t, nano::transport::traffic_type);
std::optional<entry> pop ();
std::optional<entry> peek ();
void pop ();
void clear ();
std::size_t size (nano::transport::traffic_type) const;
bool empty () const;
Expand All @@ -148,6 +150,7 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>
private:
mutable nano::mutex mutex;
std::unordered_map<nano::transport::traffic_type, std::queue<entry>> queues;
nano::transport::traffic_type last_queue;
};

write_queue send_queue;
Expand All @@ -157,6 +160,9 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>
boost::asio::ip::tcp::socket tcp_socket;
nano::node & node;

/** We use `steady_timer` as an asynchronous condition variable */
boost::asio::steady_timer write_timer;

/** The other end of the connection */
boost::asio::ip::tcp::endpoint remote;

Expand Down Expand Up @@ -193,14 +199,19 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>
/** Updated only from strand, but stored as atomic so it can be read from outside */
std::atomic<bool> write_in_progress{ false };

std::chrono::steady_clock::time_point write_cooldown;

private:
void close_internal ();
void write_queued_messages ();
void set_default_timeout ();
void set_last_completion ();
void set_last_receive_time ();
void ongoing_checkup ();
void ongoing_write ();
void read_impl (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a);

static bool is_transient_error (boost::system::error_code const &);

private:
type_t type_m{ type_t::undefined };
endpoint_type_t endpoint_type_m;
Expand Down

0 comments on commit 63c859c

Please sign in to comment.