diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index eddf9b120a..6e82a4b85d 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -56,6 +56,7 @@ enum class type tcp, tcp_server, tcp_listener, + tcp_channels, prunning, conf_processor_bounded, conf_processor_unbounded, diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index b825ecd1a2..8dc2eb77a9 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -23,6 +23,7 @@ enum class type : uint8_t http_callback, ipc, tcp, + tcp_channels, channel, socket, confirmation_height, diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 802c619f1e..fd723d8e89 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -546,6 +546,7 @@ nano::endpoint nano::network::endpoint () const void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutoff_a) { tcp_channels.purge (cutoff_a); + if (node.network.empty ()) { disconnect_observer (); diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index a1839d1bd2..4a65c13582 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -57,10 +57,6 @@ void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const buffer_a, [endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr (node.shared ()), callback_a] (boost::system::error_code const & ec, std::size_t size_a) { if (auto node_l = node.lock ()) { - if (!ec) - { - node_l->network.tcp_channels.update (endpoint_a); - } if (ec == boost::system::errc::host_unreachable) { node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out); @@ -332,7 +328,7 @@ nano::tcp_endpoint nano::transport::tcp_channels::bootstrap_peer () if (i->channel->get_network_version () >= node.network_params.network.protocol_version_min) { result = nano::transport::map_endpoint_to_tcp (i->channel->get_peering_endpoint ()); - channels.get ().modify (i, [] (channel_tcp_wrapper & wrapper_a) { + channels.get ().modify (i, [] (channel_entry & wrapper_a) { wrapper_a.channel->set_last_bootstrap_attempt (std::chrono::steady_clock::now ()); }); i = n; @@ -425,44 +421,68 @@ std::unique_ptr nano::transport::tcp_channels::c return composite; } -void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point const & cutoff_a) +void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point const & cutoff_deadline) { nano::lock_guard lock{ mutex }; - // Remove channels with dead underlying sockets - erase_if (channels, [] (auto const & entry) { - return !entry.channel->alive (); - }); + node.logger.debug (nano::log::type::tcp_channels, "Performing periodic channel cleanup..."); - auto disconnect_cutoff (channels.get ().lower_bound (cutoff_a)); - channels.get ().erase (channels.get ().begin (), disconnect_cutoff); + erase_if (channels, [this, cutoff_deadline] (auto const & entry) { + // Remove channels with dead underlying sockets + if (!entry.channel->alive ()) + { + node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", entry.channel->to_string ()); + return true; // Erase + } + // Remove channels that haven't sent a message within the cutoff time + if (entry.channel->get_last_packet_sent () < cutoff_deadline) + { + node.logger.debug (nano::log::type::tcp_channels, "Removing idle channel: {} (idle for {} seconds)", + entry.channel->to_string (), + std::chrono::duration_cast (std::chrono::steady_clock::now () - entry.channel->get_last_packet_sent ()).count ()); + return true; // Erase + } + // Check if any tcp channels belonging to old protocol versions which may still be alive due to async operations + if (entry.channel->get_network_version () < node.network_params.network.protocol_version_min) + { + node.logger.debug (nano::log::type::tcp_channels, "Removing channel with old protocol version: {}", entry.channel->to_string ()); + return true; // Erase + } + return false; + }); // Remove keepalive attempt tracking for attempts older than cutoff - auto attempts_cutoff (attempts.get ().lower_bound (cutoff_a)); + auto attempts_cutoff (attempts.get ().lower_bound (cutoff_deadline)); attempts.get ().erase (attempts.get ().begin (), attempts_cutoff); - - // Check if any tcp channels belonging to old protocol versions which may still be alive due to async operations - auto lower_bound = channels.get ().lower_bound (node.network_params.network.protocol_version_min); - channels.get ().erase (channels.get ().begin (), lower_bound); } void nano::transport::tcp_channels::ongoing_keepalive () { nano::keepalive message{ node.network_params.network }; node.network.random_fill (message.peers); + nano::unique_lock lock{ mutex }; - // Wake up channels + + auto const keepalive_sent_cutoff = std::chrono::steady_clock::now () - node.network_params.network.keepalive_period; + std::vector> send_list; - auto keepalive_sent_cutoff (channels.get ().lower_bound (std::chrono::steady_clock::now () - node.network_params.network.keepalive_period)); - for (auto i (channels.get ().begin ()); i != keepalive_sent_cutoff; ++i) + for (auto & entry : channels) { - send_list.push_back (i->channel); + if (entry.last_keepalive_sent < keepalive_sent_cutoff) + { + entry.last_keepalive_sent = std::chrono::steady_clock::now (); + send_list.push_back (entry.channel); + } } + lock.unlock (); + for (auto & channel : send_list) { + node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::keepalive, nano::stat::dir::out); channel->send (message); } + std::weak_ptr node_w (node.shared ()); node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.keepalive_period, [node_w] () { if (auto node_l = node_w.lock ()) @@ -562,24 +582,12 @@ void nano::transport::tcp_channels::modify (std::shared_ptr ().find (channel_a->get_tcp_endpoint ())); if (existing != channels.get ().end ()) { - channels.get ().modify (existing, [modify_callback = std::move (modify_callback_a)] (channel_tcp_wrapper & wrapper_a) { + channels.get ().modify (existing, [modify_callback = std::move (modify_callback_a)] (channel_entry & wrapper_a) { modify_callback (wrapper_a.channel); }); } } -void nano::transport::tcp_channels::update (nano::tcp_endpoint const & endpoint_a) -{ - nano::lock_guard lock{ mutex }; - auto existing (channels.get ().find (endpoint_a)); - if (existing != channels.get ().end ()) - { - channels.get ().modify (existing, [] (channel_tcp_wrapper & wrapper_a) { - wrapper_a.channel->set_last_packet_sent (std::chrono::steady_clock::now ()); - }); - } -} - void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a) { auto socket = std::make_shared (node); diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 506540ee5a..671108c4b7 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -131,7 +131,6 @@ namespace transport void ongoing_merge (size_t channel_index, nano::keepalive keepalive, size_t peer_index); void list (std::deque> &, uint8_t = 0, bool = true); void modify (std::shared_ptr const &, std::function const &)>); - void update (nano::tcp_endpoint const &); // Connection start void start_tcp (nano::endpoint const &); @@ -147,32 +146,30 @@ namespace transport class ip_address_tag {}; class subnetwork_tag {}; class random_access_tag {}; - class last_packet_sent_tag {}; class last_bootstrap_attempt_tag {}; class last_attempt_tag {}; class node_id_tag {}; - class version_tag {}; // clang-format on - class channel_tcp_wrapper final + class channel_entry final { public: std::shared_ptr channel; std::shared_ptr socket; std::shared_ptr response_server; - channel_tcp_wrapper (std::shared_ptr channel_a, std::shared_ptr socket_a, std::shared_ptr server_a) : + // Field not used for indexing + mutable std::chrono::steady_clock::time_point last_keepalive_sent{ std::chrono::steady_clock::time_point () }; + + channel_entry (std::shared_ptr channel_a, std::shared_ptr socket_a, std::shared_ptr server_a) : channel (std::move (channel_a)), socket (std::move (socket_a)), response_server (std::move (server_a)) { } + nano::tcp_endpoint endpoint () const { return channel->get_tcp_endpoint (); } - std::chrono::steady_clock::time_point last_packet_sent () const - { - return channel->get_last_packet_sent (); - } std::chrono::steady_clock::time_point last_bootstrap_attempt () const { return channel->get_last_bootstrap_attempt (); @@ -187,16 +184,11 @@ namespace transport } nano::account node_id () const { - auto node_id (channel->get_node_id ()); - return node_id; - } - uint8_t network_version () const - { - return channel->get_network_version (); + return channel->get_node_id (); } }; - class tcp_endpoint_attempt final + class attempt_entry final { public: nano::tcp_endpoint endpoint; @@ -204,7 +196,7 @@ namespace transport boost::asio::ip::address subnetwork; std::chrono::steady_clock::time_point last_attempt{ std::chrono::steady_clock::now () }; - explicit tcp_endpoint_attempt (nano::tcp_endpoint const & endpoint_a) : + explicit attempt_entry (nano::tcp_endpoint const & endpoint_a) : endpoint (endpoint_a), address (nano::transport::ipv4_address_or_ipv6_subnet (endpoint_a.address ())), subnetwork (nano::transport::map_address_to_subnetwork (endpoint_a.address ())) @@ -215,35 +207,31 @@ namespace transport mutable nano::mutex mutex; // clang-format off - boost::multi_index_container>, // TODO: Can this be replaced with sequential access? mi::ordered_non_unique, - mi::const_mem_fun>, + mi::const_mem_fun>, mi::hashed_unique, - mi::const_mem_fun>, + mi::const_mem_fun>, mi::hashed_non_unique, - mi::const_mem_fun>, - mi::ordered_non_unique, - mi::const_mem_fun>, - mi::ordered_non_unique, - mi::const_mem_fun>, + mi::const_mem_fun>, mi::hashed_non_unique, - mi::const_mem_fun>, + mi::const_mem_fun>, mi::hashed_non_unique, - mi::const_mem_fun>>> + mi::const_mem_fun>>> channels; - boost::multi_index_container, - mi::member>, + mi::member>, mi::hashed_non_unique, - mi::member>, + mi::member>, mi::hashed_non_unique, - mi::member>, + mi::member>, mi::ordered_non_unique, - mi::member>>> + mi::member>>> attempts; // clang-format on