Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Sep 23, 2023
1 parent 9bbbc08 commit 9b65e21
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 33 deletions.
100 changes: 68 additions & 32 deletions nano/node/transport/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ nano::transport::socket::socket (nano::node & node_a, endpoint_type_t endpoint_t
strand{ node_a.io_ctx.get_executor () },
tcp_socket{ node_a.io_ctx },
node{ node_a },
write_timer{ node_a.io_ctx },
endpoint_type_m{ endpoint_type_a },
timeout{ std::numeric_limits<uint64_t>::max () },
last_completion_time_or_init{ nano::seconds_since_epoch () },
Expand All @@ -60,6 +61,7 @@ nano::transport::socket::~socket ()
void nano::transport::socket::start ()
{
ongoing_checkup ();
ongoing_write ();
}

void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function<void (boost::system::error_code const &)> callback_a)
Expand Down Expand Up @@ -152,55 +154,87 @@ void nano::transport::socket::async_write (nano::shared_const_buffer const & buf
return;
}

boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this (), buffer_a, callback_a, traffic_type] () {
if (!this_s->write_in_progress)
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this ()] () {
this_s->write_timer.cancel (); // Signal that new data is present to be sent
}));

// notify_write ();
}

void nano::transport::socket::notify_write ()
{
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this ()] () {
if (std::chrono::steady_clock::now () >= this_s->write_cooldown)
{
this_s->write_queued_messages ();
this_s->write_timer.cancel (); // Signal that new data is present to be sent
}
}));
}

// Must be called from strand
void nano::transport::socket::write_queued_messages ()
void nano::transport::socket::throttle_write ()
{
if (closed)
{
return;
}
write_cooldown = std::chrono::steady_clock::now () + write_throttling_delay;

write_timer.expires_at (write_cooldown);
write_timer.async_wait ([this_s = shared_from_this ()] (boost::system::error_code const & ec) {
this_s->write_timer.cancel (); // Signal that new data is present to be sent
});
}

auto next = send_queue.pop ();
if (!next)
void nano::transport::socket::ongoing_write ()
{
if (closed)
{
return;
}

set_default_timeout ();

write_in_progress = true;
nano::async_write (tcp_socket, next->buffer,
boost::asio::bind_executor (strand, [this_s = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) {
this_s->write_in_progress = false;

if (ec)
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this ()] () {
if (std::chrono::steady_clock::now () < this_s->write_cooldown)
{
this_s->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
this_s->close ();
}
else
{
this_s->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size);
this_s->set_last_completion ();
this_s->write_timer.expires_at (this_s->write_cooldown);
this_s->write_timer.async_wait ([this_s] (boost::system::error_code const & ec) {
this_s->ongoing_write ();
});
return;
}

if (next->callback)
auto next = this_s->send_queue.pop ();
if (!next)
{
next->callback (ec, size);
this_s->write_timer.expires_after (std::chrono::seconds{ 1 });
this_s->write_timer.async_wait ([this_s] (boost::system::error_code const & ec) {
this_s->ongoing_write ();
});
return;
}

if (!ec)
{
this_s->write_queued_messages ();
}
debug_assert (!this_s->write_in_progress);
this_s->write_in_progress = true;

this_s->set_default_timeout ();
nano::async_write (this_s->tcp_socket, next->buffer,
boost::asio::bind_executor (this_s->strand, [this_s, callback = std::move (next->callback)] (boost::system::error_code ec, std::size_t size) {
debug_assert (this_s->write_in_progress);
this_s->write_in_progress = false;

if (ec)
{
this_s->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
this_s->close ();
}
else
{
this_s->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size);
this_s->set_last_completion ();
}

if (callback)
{
callback (ec, size);
}

this_s->ongoing_write ();
}));
}));
}

Expand Down Expand Up @@ -348,6 +382,8 @@ void nano::transport::socket::close_internal ()
tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
tcp_socket.close (ec);

write_timer.cancel ();

if (ec)
{
node.logger.try_log ("Failed to close socket gracefully: ", ec.message ());
Expand Down
17 changes: 16 additions & 1 deletion nano/node/transport/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>
boost::asio::ip::tcp::socket tcp_socket;
nano::node & node;

/** We use `steady_timer` as an asynchronous condition variable */
boost::asio::steady_timer write_timer;

/** The other end of the connection */
boost::asio::ip::tcp::endpoint remote;

Expand Down Expand Up @@ -194,13 +197,25 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>
std::atomic<bool> write_in_progress{ false };

void close_internal ();
void write_queued_messages ();
void set_default_timeout ();
void set_last_completion ();
void set_last_receive_time ();
void ongoing_checkup ();
void ongoing_write ();
void read_impl (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a);

private:
static constexpr std::chrono::milliseconds write_throttling_delay{ 50 };

std::chrono::steady_clock::time_point write_cooldown;

// Must be called from strand
void notify_write ();
// Must be called from strand
void throttle_write ();

static bool is_non_critical_error (boost::system::error_code const &);

private:
type_t type_m{ type_t::undefined };
endpoint_type_t endpoint_type_m;
Expand Down

0 comments on commit 9b65e21

Please sign in to comment.