diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 79ef835802..1a6fc2a38a 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -21,6 +21,9 @@ enum class type : uint8_t http_callback, ipc, tcp, + channel, + socket, + socket_write, confirmation_height, confirmation_observer, drop, @@ -292,6 +295,11 @@ enum class detail : uint8_t deprioritize, deprioritize_failed, + // socket + error_critical, + error_transient, + success, + _last // Must be the last enum }; diff --git a/nano/node/transport/socket.cpp b/nano/node/transport/socket.cpp index 629fe3b5fa..07e5024828 100644 --- a/nano/node/transport/socket.cpp +++ b/nano/node/transport/socket.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,7 @@ nano::transport::socket::socket (nano::node & node_a, endpoint_type_t endpoint_t strand{ node_a.io_ctx.get_executor () }, tcp_socket{ node_a.io_ctx }, node{ node_a }, + write_timer{ node_a.io_ctx }, endpoint_type_m{ endpoint_type_a }, timeout{ std::numeric_limits::max () }, last_completion_time_or_init{ nano::seconds_since_epoch () }, @@ -60,6 +62,7 @@ nano::transport::socket::~socket () void nano::transport::socket::start () { ongoing_checkup (); + ongoing_write (); } void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function callback_a) @@ -127,83 +130,147 @@ void nano::transport::socket::async_read (std::shared_ptr> } } -void nano::transport::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a, nano::transport::traffic_type traffic_type) +void nano::transport::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function callback, nano::transport::traffic_type traffic_type) { + // Existing code holds weak ptrs to a socket and assumes that as long as async ops are in progress, the socket will be kept alive + // Thus we need to capture a shared ptr to this socket in the callback + callback = [this_s = shared_from_this (), callback = std::move (callback)] (boost::system::error_code const & ec, std::size_t size) { + if (callback) + { + callback (ec, size); + } + }; + if (closed) { - if (callback_a) + if (callback) { - node.background ([callback = std::move (callback_a)] () { + node.background ([callback = std::move (callback)] () { callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); }); } return; } - bool queued = send_queue.insert (buffer_a, callback_a, traffic_type); + bool queued = send_queue.insert (buffer_a, callback, traffic_type); if (!queued) { - if (callback_a) + if (callback) { - node.background ([callback = std::move (callback_a)] () { + node.background ([callback = std::move (callback)] () { callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); }); } - return; } - - boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this (), buffer_a, callback_a, traffic_type] () { - if (!this_s->write_in_progress) - { - this_s->write_queued_messages (); - } - })); + else + { + boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this ()] () { + if (std::chrono::steady_clock::now () >= this_s->write_cooldown) + { + this_s->write_timer.cancel (); // Signal that new data is present to be sent + } + })); + } } -// Must be called from strand -void nano::transport::socket::write_queued_messages () +void nano::transport::socket::ongoing_write () { if (closed) { return; } - auto next = send_queue.pop (); - if (!next) - { - return; - } + std::weak_ptr this_w (shared_from_this ()); + boost::asio::post (strand, boost::asio::bind_executor (strand, [this_w] () { + if (auto this_l = this_w.lock ()) + { + if (std::chrono::steady_clock::now () < this_l->write_cooldown) + { + this_l->write_timer.expires_at (this_l->write_cooldown); + this_l->write_timer.async_wait ([this_w] (boost::system::error_code const & ec) { + if (auto this_l = this_w.lock ()) + { + this_l->ongoing_write (); + } + }); + return; + } - set_default_timeout (); + auto next = this_l->send_queue.peek (); + if (!next) + { + this_l->write_timer.expires_after (std::chrono::seconds{ 1 }); + this_l->write_timer.async_wait ([this_w] (boost::system::error_code const & ec) { + if (auto this_l = this_w.lock ()) + { + this_l->ongoing_write (); + } + }); + return; + } - write_in_progress = true; - nano::async_write (tcp_socket, next->buffer, - boost::asio::bind_executor (strand, [this_s = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) { - this_s->write_in_progress = false; + debug_assert (!this_l->write_in_progress); + this_l->write_in_progress = true; - if (ec) - { - this_s->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in); - this_s->close (); - } - else - { - this_s->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size); - this_s->set_last_completion (); - } + this_l->set_default_timeout (); + nano::async_write (this_l->tcp_socket, next->buffer, + boost::asio::bind_executor (this_l->strand, [this_l, callback = std::move (next->callback)] (boost::system::error_code ec, std::size_t size) { + debug_assert (this_l->write_in_progress); + this_l->write_in_progress = false; - if (next->callback) - { - next->callback (ec, size); - } + if (ec) + { + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in); - if (!ec) - { - this_s->write_queued_messages (); + if (is_transient_error (ec)) + { + this_l->node.stats.inc (nano::stat::type::socket_write, nano::stat::detail::error_transient); + + this_l->write_cooldown = std::chrono::steady_clock::now () + write_throttling_delay; + } + else + { + this_l->node.stats.inc (nano::stat::type::socket_write, nano::stat::detail::error_critical); + + this_l->close (); + + if (callback) + { + callback (ec, size); + } + } + } + else + { + this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size); + this_l->node.stats.inc (nano::stat::type::socket_write, nano::stat::detail::success); + + this_l->set_last_completion (); + this_l->send_queue.pop (); + + if (callback) + { + callback (ec, size); + } + } + + this_l->ongoing_write (); + })); } })); } +// These errors are not critical and should not cause the socket to be disconnected +bool nano::transport::socket::is_transient_error (const boost::system::error_code & ec) +{ + debug_assert (ec.category () == boost::asio::error::system_category); + if (ec == boost::asio::error::no_buffer_space) + { + return true; + } + return false; +} + bool nano::transport::socket::max (nano::transport::traffic_type traffic_type) const { return send_queue.size (traffic_type) >= max_queue_size; @@ -339,8 +406,6 @@ void nano::transport::socket::close_internal () return; } - send_queue.clear (); - default_timeout = std::chrono::seconds (0); boost::system::error_code ec; @@ -353,6 +418,9 @@ void nano::transport::socket::close_internal () node.logger.try_log ("Failed to close socket gracefully: ", ec.message ()); node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::error_socket_close); } + + send_queue.clear (); + write_timer.cancel (); } nano::tcp_endpoint nano::transport::socket::remote_endpoint () const @@ -385,7 +453,7 @@ bool nano::transport::socket::write_queue::insert (const buffer_t & buffer, call return false; // Not queued } -std::optional nano::transport::socket::write_queue::pop () +std::optional nano::transport::socket::write_queue::peek () { nano::lock_guard guard{ mutex }; @@ -394,7 +462,7 @@ std::optional nano::transport::sock if (!que.empty ()) { auto item = que.front (); - que.pop (); + last_queue = type; return item; } return std::nullopt; @@ -413,6 +481,19 @@ std::optional nano::transport::sock return std::nullopt; } +void nano::transport::socket::write_queue::pop () +{ + nano::lock_guard guard{ mutex }; + + // Might get emptied if socket is closed while sending + if (auto it = queues.find (last_queue); it != queues.end ()) + { + auto & que = it->second; + release_assert (!que.empty ()); + que.pop (); + } +} + void nano::transport::socket::write_queue::clear () { nano::lock_guard guard{ mutex }; diff --git a/nano/node/transport/socket.hpp b/nano/node/transport/socket.hpp index d0afec196b..ec06eef214 100644 --- a/nano/node/transport/socket.hpp +++ b/nano/node/transport/socket.hpp @@ -49,6 +49,7 @@ class socket : public std::enable_shared_from_this public: static std::size_t constexpr default_max_queue_size = 128; + static std::chrono::milliseconds constexpr write_throttling_delay{ 50 }; enum class type_t { @@ -138,7 +139,8 @@ class socket : public std::enable_shared_from_this explicit write_queue (std::size_t max_size); bool insert (buffer_t const &, callback_t, nano::transport::traffic_type); - std::optional pop (); + std::optional peek (); + void pop (); void clear (); std::size_t size (nano::transport::traffic_type) const; bool empty () const; @@ -148,6 +150,7 @@ class socket : public std::enable_shared_from_this private: mutable nano::mutex mutex; std::unordered_map> queues; + nano::transport::traffic_type last_queue; }; write_queue send_queue; @@ -157,6 +160,9 @@ class socket : public std::enable_shared_from_this boost::asio::ip::tcp::socket tcp_socket; nano::node & node; + /** We use `steady_timer` as an asynchronous condition variable */ + boost::asio::steady_timer write_timer; + /** The other end of the connection */ boost::asio::ip::tcp::endpoint remote; @@ -193,14 +199,19 @@ class socket : public std::enable_shared_from_this /** Updated only from strand, but stored as atomic so it can be read from outside */ std::atomic write_in_progress{ false }; + std::chrono::steady_clock::time_point write_cooldown; + +private: void close_internal (); - void write_queued_messages (); void set_default_timeout (); void set_last_completion (); void set_last_receive_time (); void ongoing_checkup (); + void ongoing_write (); void read_impl (std::shared_ptr> const & data_a, std::size_t size_a, std::function callback_a); + static bool is_transient_error (boost::system::error_code const &); + private: type_t type_m{ type_t::undefined }; endpoint_type_t endpoint_type_m;