From 06b2ad7e151258b41f7de4a9ced51ae92ad4a6bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 16 Mar 2024 15:31:47 +0100 Subject: [PATCH] Rework channel purging --- nano/core_test/network.cpp | 2 +- nano/core_test/peer_container.cpp | 1 + nano/core_test/socket.cpp | 4 +- nano/node/transport/channel.hpp | 3 ++ nano/node/transport/fake.hpp | 2 +- nano/node/transport/inproc.hpp | 5 ++ nano/node/transport/tcp.cpp | 79 +++++++++++++++++++------------ nano/node/transport/tcp.hpp | 35 +++++++------- 8 files changed, 80 insertions(+), 51 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 976dc12633..9ffa478f91 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -1080,7 +1080,7 @@ TEST (network, cleanup_purge) ASSERT_EQ (1, node1.network.size ()); node1.network.cleanup (std::chrono::steady_clock::now ()); - ASSERT_EQ (0, node1.network.size ()); + ASSERT_TIMELY_EQ (5s, 0, node1.network.size ()); } TEST (network, loopback_channel) diff --git a/nano/core_test/peer_container.cpp b/nano/core_test/peer_container.cpp index 7639d3ad91..dd6b69edc2 100644 --- a/nano/core_test/peer_container.cpp +++ b/nano/core_test/peer_container.cpp @@ -228,6 +228,7 @@ TEST (peer_container, reachout) ASSERT_TRUE (node1.network.track_reachout (outer_node2->network.endpoint ())); // Make sure we purge old items node1.network.cleanup (std::chrono::steady_clock::now () + std::chrono::seconds (10)); + ASSERT_TIMELY (5s, node1.network.empty ()); ASSERT_FALSE (node1.network.track_reachout (outer_node2->network.endpoint ())); } diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index eaa31f761b..1ec356391b 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -420,7 +420,7 @@ TEST (socket, drop_policy) }); auto client = std::make_shared (*node); - nano::transport::channel_tcp channel{ *node, client }; + auto channel = std::make_shared (*node, client); nano::test::counted_completion write_completion (static_cast (total_message_count)); client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), listener->endpoint ().port ()), @@ -428,7 +428,7 @@ TEST (socket, drop_policy) for (int i = 0; i < total_message_count; i++) { std::vector buff (1); - channel.send_buffer ( + channel->send_buffer ( nano::shared_const_buffer (std::move (buff)), [&write_completion, client] (boost::system::error_code const & ec, size_t size_a) mutable { client.reset (); write_completion.increment (); diff --git a/nano/node/transport/channel.hpp b/nano/node/transport/channel.hpp index bede756ce3..35f8f37bab 100644 --- a/nano/node/transport/channel.hpp +++ b/nano/node/transport/channel.hpp @@ -41,6 +41,8 @@ class channel nano::transport::traffic_type = nano::transport::traffic_type::generic) = 0; + virtual void close () = 0; + virtual std::string to_string () const = 0; virtual nano::endpoint get_endpoint () const = 0; virtual nano::tcp_endpoint get_tcp_endpoint () const = 0; @@ -50,6 +52,7 @@ class channel { return false; } + virtual bool alive () const { return true; diff --git a/nano/node/transport/fake.hpp b/nano/node/transport/fake.hpp index 809c5b98ae..5c720158bd 100644 --- a/nano/node/transport/fake.hpp +++ b/nano/node/transport/fake.hpp @@ -49,7 +49,7 @@ namespace transport return nano::transport::transport_type::fake; } - void close () + void close () override { closed = true; } diff --git a/nano/node/transport/inproc.hpp b/nano/node/transport/inproc.hpp index c6012bc1a1..fc318ef51f 100644 --- a/nano/node/transport/inproc.hpp +++ b/nano/node/transport/inproc.hpp @@ -43,6 +43,11 @@ namespace transport return nano::transport::transport_type::loopback; } + void close () override + { + // Can't be closed + } + private: nano::node & destination; nano::endpoint const endpoint; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index bc86b4cbc0..2696f29b8a 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -54,12 +54,12 @@ void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const if (!socket_l->max (traffic_type) || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket_l->full (traffic_type))) { socket_l->async_write ( - buffer_a, [endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr (node.shared ()), callback_a] (boost::system::error_code const & ec, std::size_t size_a) { + buffer_a, [this_s = shared_from_this (), endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr{ node.shared () }, callback_a] (boost::system::error_code const & ec, std::size_t size_a) { if (auto node_l = node.lock ()) { if (!ec) { - node_l->network.tcp_channels.update (endpoint_a); + this_s->set_last_packet_sent (std::chrono::steady_clock::now ()); } if (ec == boost::system::errc::host_unreachable) { @@ -475,25 +475,52 @@ std::unique_ptr nano::transport::tcp_channels::c return composite; } -void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point const & cutoff_a) +void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point cutoff_deadline) { nano::lock_guard lock{ mutex }; - // Remove channels with dead underlying sockets - erase_if (channels, [] (auto const & entry) { - return !entry.channel->alive (); - }); + node.logger.debug (nano::log::type::tcp_channels, "Performing periodic channel cleanup, cutoff: {}s", nano::log::seconds_delta (cutoff_deadline)); + + auto should_close = [this, cutoff_deadline] (auto const & channel) { + // Remove channels that haven't successfully sent a message within the cutoff time + if (channel->get_last_packet_sent () < cutoff_deadline) + { + node.logger.debug (nano::log::type::tcp_channels, "Closing idle channel: {} (idle for {} seconds)", + channel->to_string (), + nano::log::seconds (std::chrono::steady_clock::now () - channel->get_last_packet_sent ())); + + return true; // Close + } + // Check if any tcp channels belonging to old protocol versions which may still be alive due to async operations + if (channel->get_network_version () < node.network_params.network.protocol_version_min) + { + node.logger.debug (nano::log::type::tcp_channels, "Closing channel with old protocol version: {}", channel->to_string ()); + + return true; // Close + } + return false; + }; - auto disconnect_cutoff (channels.get ().lower_bound (cutoff_a)); - channels.get ().erase (channels.get ().begin (), disconnect_cutoff); + for (auto const & entry : channels) + { + if (should_close (entry.channel)) + { + entry.channel->close (); + } + } + + erase_if (channels, [this] (auto const & entry) { + if (!entry.channel->alive ()) + { + node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", entry.channel->to_string ()); + return true; // Erase + } + return false; + }); // Remove keepalive attempt tracking for attempts older than cutoff - auto attempts_cutoff (attempts.get ().lower_bound (cutoff_a)); + auto attempts_cutoff (attempts.get ().lower_bound (cutoff_deadline)); attempts.get ().erase (attempts.get ().begin (), attempts_cutoff); - - // Check if any tcp channels belonging to old protocol versions which may still be alive due to async operations - auto lower_bound = channels.get ().lower_bound (node.network_params.network.protocol_version_min); - channels.get ().erase (channels.get ().begin (), lower_bound); } void nano::transport::tcp_channels::keepalive () @@ -506,16 +533,18 @@ void nano::transport::tcp_channels::keepalive () auto const cutoff_time = std::chrono::steady_clock::now () - node.network_params.network.keepalive_period; // Wake up channels - std::vector> send_list; - auto keepalive_sent_cutoff (channels.get ().lower_bound (cutoff_time)); - for (auto i (channels.get ().begin ()); i != keepalive_sent_cutoff; ++i) + std::vector> to_wakeup; + for (auto const & entry : channels) { - send_list.push_back (i->channel); + if (entry.channel->get_last_packet_sent () < cutoff_time) + { + to_wakeup.push_back (entry.channel); + } } lock.unlock (); - for (auto & channel : send_list) + for (auto & channel : to_wakeup) { channel->send (message); } @@ -563,18 +592,6 @@ void nano::transport::tcp_channels::modify (std::shared_ptr lock{ mutex }; - auto existing (channels.get ().find (endpoint_a)); - if (existing != channels.get ().end ()) - { - channels.get ().modify (existing, [] (channel_entry & wrapper_a) { - wrapper_a.channel->set_last_packet_sent (std::chrono::steady_clock::now ()); - }); - } -} - void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a) { auto socket = std::make_shared (node); diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index e0ae674843..0402b915af 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -55,7 +55,7 @@ namespace transport class tcp_server; class tcp_channels; - class channel_tcp : public nano::transport::channel + class channel_tcp : public nano::transport::channel, public std::enable_shared_from_this { friend class nano::transport::tcp_channels; @@ -74,10 +74,6 @@ namespace transport { return &node == &other_a.node && socket.lock () == other_a.socket.lock (); } - std::weak_ptr socket; - /* Mark for temporary channels. Usually remote ports of these channels are ephemeral and received from incoming connections to server. - If remote part has open listening port, temporary channel will be replaced with direct connection to listening port soon. But if other side is behing NAT or firewall this connection can be pemanent. */ - std::atomic temporary{ false }; void set_endpoint (); @@ -97,7 +93,7 @@ namespace transport return nano::transport::transport_type::tcp; } - virtual bool max (nano::transport::traffic_type traffic_type) override + bool max (nano::transport::traffic_type traffic_type) override { bool result = true; if (auto socket_l = socket.lock ()) @@ -107,7 +103,7 @@ namespace transport return result; } - virtual bool alive () const override + bool alive () const override { if (auto socket_l = socket.lock ()) { @@ -116,6 +112,21 @@ namespace transport return false; } + void close () override + { + if (auto socket_l = socket.lock ()) + { + socket_l->close (); + } + } + + public: + std::weak_ptr socket; + + /* Mark for temporary channels. Usually remote ports of these channels are ephemeral and received from incoming connections to server. + If remote part has open listening port, temporary channel will be replaced with direct connection to listening port soon. But if other side is behing NAT or firewall this connection can be pemanent. */ + std::atomic temporary{ false }; + private: nano::tcp_endpoint endpoint{ boost::asio::ip::address_v6::any (), 0 }; @@ -154,10 +165,9 @@ namespace transport // Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt bool track_reachout (nano::endpoint const &); std::unique_ptr collect_container_info (std::string const &); - void purge (std::chrono::steady_clock::time_point const &); + void purge (std::chrono::steady_clock::time_point cutoff_deadline); void list (std::deque> &, uint8_t = 0, bool = true); void modify (std::shared_ptr const &, std::function const &)>); - void update (nano::tcp_endpoint const &); void keepalive (); std::optional sample_keepalive (); @@ -191,10 +201,6 @@ namespace transport { return channel->get_tcp_endpoint (); } - std::chrono::steady_clock::time_point last_packet_sent () const - { - return channel->get_last_packet_sent (); - } std::chrono::steady_clock::time_point last_bootstrap_attempt () const { return channel->get_last_bootstrap_attempt (); @@ -240,7 +246,6 @@ namespace transport class ip_address_tag {}; class subnetwork_tag {}; class random_access_tag {}; - class last_packet_sent_tag {}; class last_bootstrap_attempt_tag {}; class last_attempt_tag {}; class node_id_tag {}; @@ -257,8 +262,6 @@ namespace transport mi::const_mem_fun>, mi::hashed_non_unique, mi::const_mem_fun>, - mi::ordered_non_unique, - mi::const_mem_fun>, mi::ordered_non_unique, mi::const_mem_fun>, mi::hashed_non_unique,