From 36bec9b03b8f2172877c1ecd1b205369384f6cef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Wed, 13 Mar 2024 20:11:20 +0100 Subject: [PATCH] Dedicated thread for periodic cleanup --- nano/lib/stats_enums.hpp | 2 ++ nano/lib/thread_roles.cpp | 3 ++ nano/lib/thread_roles.hpp | 1 + nano/node/network.cpp | 60 ++++++++++++++++++++++++++++----------- nano/node/network.hpp | 7 +++-- 5 files changed, 55 insertions(+), 18 deletions(-) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 6d1ac20b29..71128cef1a 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -17,6 +17,7 @@ enum class type : uint8_t ledger, rollback, bootstrap, + network, tcp_server, vote, election, @@ -67,6 +68,7 @@ enum class detail : uint8_t // common ok, loop, + loop_cleanup, total, process, processed, diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 944ac0e1bd..3bb70376e2 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -109,6 +109,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::rep_tiers: thread_role_name_string = "Rep tiers"; break; + case nano::thread_role::name::network_cleanup: + thread_role_name_string = "Net cleanup"; + break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 724efad5c8..0fc3c97246 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -45,6 +45,7 @@ enum class name rep_crawler, local_block_broadcasting, rep_tiers, + network_cleanup, }; /* diff --git a/nano/node/network.cpp b/nano/node/network.cpp index cf11683a3a..c1da3eabfb 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -9,6 +9,8 @@ #include +using namespace std::chrono_literals; + /* * network */ @@ -31,13 +33,17 @@ nano::network::~network () { // All threads must be stopped before this destructor debug_assert (processing_threads.empty ()); + debug_assert (!cleanup_thread.joinable ()); } void nano::network::start () { if (!node.flags.disable_connection_cleanup) { - ongoing_cleanup (); + cleanup_thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::network_cleanup); + run_cleanup (); + }); } ongoing_syn_cookie_cleanup (); @@ -59,7 +65,11 @@ void nano::network::start () void nano::network::stop () { - stopped = true; + { + nano::lock_guard lock{ mutex }; + stopped = true; + } + condition.notify_all (); tcp_channels.stop (); resolver.cancel (); @@ -71,6 +81,11 @@ void nano::network::stop () } processing_threads.clear (); + if (cleanup_thread.joinable ()) + { + cleanup_thread.join (); + } + port = 0; } @@ -103,6 +118,28 @@ void nano::network::run_processing () } } +void nano::network::run_cleanup () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + condition.wait_for (lock, node.network_params.network.is_dev_network () ? 1s : 5s); + lock.unlock (); + + if (stopped) + { + return; + } + + node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_cleanup); + + auto const cutoff = std::chrono::steady_clock::now () - node.network_params.network.cleanup_cutoff (); + cleanup (cutoff); + + lock.lock (); + } +} + void nano::network::send_keepalive (std::shared_ptr const & channel_a) { nano::keepalive message{ node.network_params.network }; @@ -493,27 +530,18 @@ nano::endpoint nano::network::endpoint () const return nano::endpoint (boost::asio::ip::address_v6::loopback (), port); } -void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutoff_a) +void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutoff) { - tcp_channels.purge (cutoff_a); + node.logger.debug (nano::log::type::network, "Performing cleanup, cutoff: {}s", nano::log::seconds_delta (cutoff)); + + tcp_channels.purge (cutoff); + if (node.network.empty ()) { disconnect_observer (); } } -void nano::network::ongoing_cleanup () -{ - cleanup (std::chrono::steady_clock::now () - node.network_params.network.cleanup_cutoff ()); - std::weak_ptr node_w (node.shared ()); - node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 5), [node_w] () { - if (auto node_l = node_w.lock ()) - { - node_l->network.ongoing_cleanup (); - } - }); -} - void nano::network::ongoing_syn_cookie_cleanup () { syn_cookies.purge (std::chrono::steady_clock::now () - nano::transport::syn_cookie_cutoff); diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 79b66dba97..0e1a742251 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -111,8 +111,7 @@ class network final // Get the next peer for attempting a tcp bootstrap connection nano::tcp_endpoint bootstrap_peer (); nano::endpoint endpoint () const; - void cleanup (std::chrono::steady_clock::time_point const &); - void ongoing_cleanup (); + void cleanup (std::chrono::steady_clock::time_point const & cutoff); void ongoing_syn_cookie_cleanup (); void ongoing_keepalive (); std::size_t size () const; @@ -131,6 +130,7 @@ class network final private: void run_processing (); + void run_cleanup (); void process_message (nano::message const &, std::shared_ptr const &); private: // Dependencies @@ -153,7 +153,10 @@ class network final private: std::atomic stopped{ false }; + mutable nano::mutex mutex; + nano::condition_variable condition; std::vector processing_threads; // Using boost::thread to enable increased stack size + std::thread cleanup_thread; public: static unsigned const broadcast_interval_ms = 10;