From 9b65e21c4252c27108bd58876861bcfd630f5a97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 24 Sep 2023 00:06:45 +0200 Subject: [PATCH] WIP --- nano/node/transport/socket.cpp | 100 ++++++++++++++++++++++----------- nano/node/transport/socket.hpp | 17 +++++- 2 files changed, 84 insertions(+), 33 deletions(-) diff --git a/nano/node/transport/socket.cpp b/nano/node/transport/socket.cpp index 629fe3b5fa..5f0cee0eb4 100644 --- a/nano/node/transport/socket.cpp +++ b/nano/node/transport/socket.cpp @@ -42,6 +42,7 @@ nano::transport::socket::socket (nano::node & node_a, endpoint_type_t endpoint_t strand{ node_a.io_ctx.get_executor () }, tcp_socket{ node_a.io_ctx }, node{ node_a }, + write_timer{ node_a.io_ctx }, endpoint_type_m{ endpoint_type_a }, timeout{ std::numeric_limits::max () }, last_completion_time_or_init{ nano::seconds_since_epoch () }, @@ -60,6 +61,7 @@ nano::transport::socket::~socket () void nano::transport::socket::start () { ongoing_checkup (); + ongoing_write (); } void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function callback_a) @@ -152,55 +154,87 @@ void nano::transport::socket::async_write (nano::shared_const_buffer const & buf return; } - boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this (), buffer_a, callback_a, traffic_type] () { - if (!this_s->write_in_progress) + boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this ()] () { + this_s->write_timer.cancel (); // Signal that new data is present to be sent + })); + + // notify_write (); +} + +void nano::transport::socket::notify_write () +{ + boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this ()] () { + if (std::chrono::steady_clock::now () >= this_s->write_cooldown) { - this_s->write_queued_messages (); + this_s->write_timer.cancel (); // Signal that new data is present to be sent } })); } -// Must be called from strand -void nano::transport::socket::write_queued_messages () +void nano::transport::socket::throttle_write () { - if (closed) - { - return; - } + write_cooldown = std::chrono::steady_clock::now () + write_throttling_delay; + + write_timer.expires_at (write_cooldown); + write_timer.async_wait ([this_s = shared_from_this ()] (boost::system::error_code const & ec) { + this_s->write_timer.cancel (); // Signal that new data is present to be sent + }); +} - auto next = send_queue.pop (); - if (!next) +void nano::transport::socket::ongoing_write () +{ + if (closed) { return; } - set_default_timeout (); - - write_in_progress = true; - nano::async_write (tcp_socket, next->buffer, - boost::asio::bind_executor (strand, [this_s = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) { - this_s->write_in_progress = false; - - if (ec) + boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this ()] () { + if (std::chrono::steady_clock::now () < this_s->write_cooldown) { - this_s->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in); - this_s->close (); - } - else - { - this_s->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size); - this_s->set_last_completion (); + this_s->write_timer.expires_at (this_s->write_cooldown); + this_s->write_timer.async_wait ([this_s] (boost::system::error_code const & ec) { + this_s->ongoing_write (); + }); + return; } - if (next->callback) + auto next = this_s->send_queue.pop (); + if (!next) { - next->callback (ec, size); + this_s->write_timer.expires_after (std::chrono::seconds{ 1 }); + this_s->write_timer.async_wait ([this_s] (boost::system::error_code const & ec) { + this_s->ongoing_write (); + }); + return; } - if (!ec) - { - this_s->write_queued_messages (); - } + debug_assert (!this_s->write_in_progress); + this_s->write_in_progress = true; + + this_s->set_default_timeout (); + nano::async_write (this_s->tcp_socket, next->buffer, + boost::asio::bind_executor (this_s->strand, [this_s, callback = std::move (next->callback)] (boost::system::error_code ec, std::size_t size) { + debug_assert (this_s->write_in_progress); + this_s->write_in_progress = false; + + if (ec) + { + this_s->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in); + this_s->close (); + } + else + { + this_s->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size); + this_s->set_last_completion (); + } + + if (callback) + { + callback (ec, size); + } + + this_s->ongoing_write (); + })); })); } @@ -348,6 +382,8 @@ void nano::transport::socket::close_internal () tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec); tcp_socket.close (ec); + write_timer.cancel (); + if (ec) { node.logger.try_log ("Failed to close socket gracefully: ", ec.message ()); diff --git a/nano/node/transport/socket.hpp b/nano/node/transport/socket.hpp index d0afec196b..d875045c1b 100644 --- a/nano/node/transport/socket.hpp +++ b/nano/node/transport/socket.hpp @@ -157,6 +157,9 @@ class socket : public std::enable_shared_from_this boost::asio::ip::tcp::socket tcp_socket; nano::node & node; + /** We use `steady_timer` as an asynchronous condition variable */ + boost::asio::steady_timer write_timer; + /** The other end of the connection */ boost::asio::ip::tcp::endpoint remote; @@ -194,13 +197,25 @@ class socket : public std::enable_shared_from_this std::atomic write_in_progress{ false }; void close_internal (); - void write_queued_messages (); void set_default_timeout (); void set_last_completion (); void set_last_receive_time (); void ongoing_checkup (); + void ongoing_write (); void read_impl (std::shared_ptr> const & data_a, std::size_t size_a, std::function callback_a); +private: + static constexpr std::chrono::milliseconds write_throttling_delay{ 50 }; + + std::chrono::steady_clock::time_point write_cooldown; + + // Must be called from strand + void notify_write (); + // Must be called from strand + void throttle_write (); + + static bool is_non_critical_error (boost::system::error_code const &); + private: type_t type_m{ type_t::undefined }; endpoint_type_t endpoint_type_m;