From 63c859c4e095df0df065d37dd5ef7f162dde9110 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 19 Oct 2023 15:44:14 +0200 Subject: [PATCH] WIP --- nano/lib/stats_enums.hpp | 8 ++ nano/node/transport/socket.cpp | 141 +++++++++++++++++++++++---------- nano/node/transport/socket.hpp | 15 +++- 3 files changed, 121 insertions(+), 43 deletions(-) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 79ef835802..1a6fc2a38a 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -21,6 +21,9 @@ enum class type : uint8_t http_callback, ipc, tcp, + channel, + socket, + socket_write, confirmation_height, confirmation_observer, drop, @@ -292,6 +295,11 @@ enum class detail : uint8_t deprioritize, deprioritize_failed, + // socket + error_critical, + error_transient, + success, + _last // Must be the last enum }; diff --git a/nano/node/transport/socket.cpp b/nano/node/transport/socket.cpp index 629fe3b5fa..edbc8a714e 100644 --- a/nano/node/transport/socket.cpp +++ b/nano/node/transport/socket.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,7 @@ nano::transport::socket::socket (nano::node & node_a, endpoint_type_t endpoint_t strand{ node_a.io_ctx.get_executor () }, tcp_socket{ node_a.io_ctx }, node{ node_a }, + write_timer{ node_a.io_ctx }, endpoint_type_m{ endpoint_type_a }, timeout{ std::numeric_limits::max () }, last_completion_time_or_init{ nano::seconds_since_epoch () }, @@ -60,6 +62,7 @@ nano::transport::socket::~socket () void nano::transport::socket::start () { ongoing_checkup (); + ongoing_write (); } void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function callback_a) @@ -149,61 +152,105 @@ void nano::transport::socket::async_write (nano::shared_const_buffer const & buf callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); }); } - return; } - - boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this (), buffer_a, callback_a, traffic_type] () { - if (!this_s->write_in_progress) - { - this_s->write_queued_messages (); - } - })); + else + { + boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this ()] () { + if (std::chrono::steady_clock::now () >= this_s->write_cooldown) + { + this_s->write_timer.cancel (); // Signal that new data is present to be sent + } + })); + } } -// Must be called from strand -void nano::transport::socket::write_queued_messages () +void nano::transport::socket::ongoing_write () { if (closed) { return; } - auto next = send_queue.pop (); - if (!next) - { - return; - } - - set_default_timeout (); - - write_in_progress = true; - nano::async_write (tcp_socket, next->buffer, - boost::asio::bind_executor (strand, [this_s = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) { - this_s->write_in_progress = false; - - if (ec) - { - this_s->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in); - this_s->close (); - } - else + boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this ()] () { + if (std::chrono::steady_clock::now () < this_s->write_cooldown) { - this_s->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size); - this_s->set_last_completion (); + this_s->write_timer.expires_at (this_s->write_cooldown); + this_s->write_timer.async_wait ([this_s] (boost::system::error_code const & ec) { + this_s->ongoing_write (); + }); + return; } - if (next->callback) + auto next = this_s->send_queue.peek (); + if (!next) { - next->callback (ec, size); + this_s->write_timer.expires_after (std::chrono::seconds{ 1 }); + this_s->write_timer.async_wait ([this_s] (boost::system::error_code const & ec) { + this_s->ongoing_write (); + }); + return; } - if (!ec) - { - this_s->write_queued_messages (); - } + debug_assert (!this_s->write_in_progress); + this_s->write_in_progress = true; + + this_s->set_default_timeout (); + nano::async_write (this_s->tcp_socket, next->buffer, + boost::asio::bind_executor (this_s->strand, [this_s, callback = std::move (next->callback)] (boost::system::error_code ec, std::size_t size) { + debug_assert (this_s->write_in_progress); + this_s->write_in_progress = false; + + if (ec) + { + this_s->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in); + + if (is_transient_error (ec)) + { + this_s->node.stats.inc (nano::stat::type::socket_write, nano::stat::detail::error_transient); + + this_s->write_cooldown = std::chrono::steady_clock::now () + write_throttling_delay; + } + else + { + this_s->node.stats.inc (nano::stat::type::socket_write, nano::stat::detail::error_critical); + + this_s->close (); + + if (callback) + { + callback (ec, size); + } + } + } + else + { + this_s->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size); + this_s->node.stats.inc (nano::stat::type::socket_write, nano::stat::detail::success); + + this_s->set_last_completion (); + this_s->send_queue.pop (); + + if (callback) + { + callback (ec, size); + } + } + + this_s->ongoing_write (); + })); })); } +// These errors are not critical and should not cause the socket to be disconnected +bool nano::transport::socket::is_transient_error (const boost::system::error_code & ec) +{ + if (ec == boost::asio::error::no_buffer_space) + { + return true; + } + return false; +} + bool nano::transport::socket::max (nano::transport::traffic_type traffic_type) const { return send_queue.size (traffic_type) >= max_queue_size; @@ -339,8 +386,6 @@ void nano::transport::socket::close_internal () return; } - send_queue.clear (); - default_timeout = std::chrono::seconds (0); boost::system::error_code ec; @@ -348,6 +393,8 @@ void nano::transport::socket::close_internal () tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec); tcp_socket.close (ec); + write_timer.cancel (); + if (ec) { node.logger.try_log ("Failed to close socket gracefully: ", ec.message ()); @@ -385,7 +432,7 @@ bool nano::transport::socket::write_queue::insert (const buffer_t & buffer, call return false; // Not queued } -std::optional nano::transport::socket::write_queue::pop () +std::optional nano::transport::socket::write_queue::peek () { nano::lock_guard guard{ mutex }; @@ -394,7 +441,7 @@ std::optional nano::transport::sock if (!que.empty ()) { auto item = que.front (); - que.pop (); + last_queue = type; return item; } return std::nullopt; @@ -413,6 +460,18 @@ std::optional nano::transport::sock return std::nullopt; } +void nano::transport::socket::write_queue::pop () +{ + nano::lock_guard guard{ mutex }; + + auto it = queues.find (last_queue); + release_assert (it != queues.end ()); + + auto & que = it->second; + release_assert (!que.empty ()); + que.pop (); +} + void nano::transport::socket::write_queue::clear () { nano::lock_guard guard{ mutex }; diff --git a/nano/node/transport/socket.hpp b/nano/node/transport/socket.hpp index d0afec196b..ec06eef214 100644 --- a/nano/node/transport/socket.hpp +++ b/nano/node/transport/socket.hpp @@ -49,6 +49,7 @@ class socket : public std::enable_shared_from_this public: static std::size_t constexpr default_max_queue_size = 128; + static std::chrono::milliseconds constexpr write_throttling_delay{ 50 }; enum class type_t { @@ -138,7 +139,8 @@ class socket : public std::enable_shared_from_this explicit write_queue (std::size_t max_size); bool insert (buffer_t const &, callback_t, nano::transport::traffic_type); - std::optional pop (); + std::optional peek (); + void pop (); void clear (); std::size_t size (nano::transport::traffic_type) const; bool empty () const; @@ -148,6 +150,7 @@ class socket : public std::enable_shared_from_this private: mutable nano::mutex mutex; std::unordered_map> queues; + nano::transport::traffic_type last_queue; }; write_queue send_queue; @@ -157,6 +160,9 @@ class socket : public std::enable_shared_from_this boost::asio::ip::tcp::socket tcp_socket; nano::node & node; + /** We use `steady_timer` as an asynchronous condition variable */ + boost::asio::steady_timer write_timer; + /** The other end of the connection */ boost::asio::ip::tcp::endpoint remote; @@ -193,14 +199,19 @@ class socket : public std::enable_shared_from_this /** Updated only from strand, but stored as atomic so it can be read from outside */ std::atomic write_in_progress{ false }; + std::chrono::steady_clock::time_point write_cooldown; + +private: void close_internal (); - void write_queued_messages (); void set_default_timeout (); void set_last_completion (); void set_last_receive_time (); void ongoing_checkup (); + void ongoing_write (); void read_impl (std::shared_ptr> const & data_a, std::size_t size_a, std::function callback_a); + static bool is_transient_error (boost::system::error_code const &); + private: type_t type_m{ type_t::undefined }; endpoint_type_t endpoint_type_m;