diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 42f255d20f..de7b6d020c 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -71,7 +71,6 @@ enum class detail : uint8_t ok, loop, loop_cleanup, - loop_keepalive, total, process, processed, @@ -217,6 +216,11 @@ enum class detail : uint8_t message_size_too_big, outdated_version, + // network + loop_keepalive, + loop_reachout, + merge_peer, + // tcp tcp_accept_success, tcp_accept_failure, diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index bb65aeaa68..666558d583 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -115,6 +115,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::network_keepalive: thread_role_name_string = "Net keepalive"; break; + case nano::thread_role::name::network_reachout: + thread_role_name_string = "Net reachout"; + break; case nano::thread_role::name::tcp_keepalive: thread_role_name_string = "Tcp keepalive"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 2f5a47041d..4afed7e1ef 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -47,6 +47,7 @@ enum class name rep_tiers, network_cleanup, network_keepalive, + network_reachout, tcp_keepalive, }; diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 71ef05e7dd..ec1df815ff 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -48,6 +48,11 @@ void nano::network::start () run_keepalive (); }); + reachout_thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::network_reachout); + run_reachout (); + }); + if (!node.flags.disable_tcp_realtime) { tcp_channels.start (); @@ -87,6 +92,10 @@ void nano::network::stop () { cleanup_thread.join (); } + if (reachout_thread.joinable ()) + { + reachout_thread.join (); + } port = 0; } @@ -126,12 +135,11 @@ void nano::network::run_cleanup () while (!stopped) { condition.wait_for (lock, node.network_params.network.is_dev_network () ? 1s : 5s); - lock.unlock (); - if (stopped) { return; } + lock.unlock (); node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_cleanup); @@ -154,12 +162,11 @@ void nano::network::run_keepalive () while (!stopped) { condition.wait_for (lock, node.network_params.network.keepalive_period); - lock.unlock (); - if (stopped) { return; } + lock.unlock (); node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_keepalive); @@ -170,6 +177,41 @@ void nano::network::run_keepalive () } } +void nano::network::run_reachout () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + condition.wait_for (lock, node.network_params.network.merge_period); + if (stopped) + { + return; + } + lock.unlock (); + + node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_reachout); + + auto keepalive = tcp_channels.sample_keepalive (); + if (keepalive) + { + for (auto const & peer : keepalive->peers) + { + if (stopped) + { + return; + } + + merge_peer (peer); + + // Throttle reachout attempts + std::this_thread::sleep_for (node.network_params.network.merge_period); + } + } + + lock.lock (); + } +} + void nano::network::send_keepalive (std::shared_ptr const & channel_a) { nano::keepalive message{ node.network_params.network }; @@ -413,8 +455,9 @@ void nano::network::merge_peer (nano::endpoint const & peer_a) { if (!track_reachout (peer_a)) { - std::weak_ptr node_w (node.shared ()); - node.network.tcp_channels.start_tcp (peer_a); + node.stats.inc (nano::stat::type::network, nano::stat::detail::merge_peer); + + tcp_channels.start_tcp (peer_a); } } diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 1e9ab3d734..c10ebc5ccb 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -111,6 +111,7 @@ class network final void run_processing (); void run_cleanup (); void run_keepalive (); + void run_reachout (); void process_message (nano::message const &, std::shared_ptr const &); private: // Dependencies @@ -137,6 +138,7 @@ class network final std::vector processing_threads; // Using boost::thread to enable increased stack size std::thread cleanup_thread; std::thread keepalive_thread; + std::thread reachout_thread; public: static unsigned const broadcast_interval_ms = 10; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index b062bdc985..7d6bbe4a94 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -139,8 +139,6 @@ nano::transport::tcp_channels::~tcp_channels () void nano::transport::tcp_channels::start () { - ongoing_merge (0); - keepalive_thread = std::thread ([this] () { nano::thread_role::set (nano::thread_role::name::tcp_keepalive); run_keepalive (); @@ -548,75 +546,29 @@ void nano::transport::tcp_channels::keepalive () } } -void nano::transport::tcp_channels::ongoing_merge (size_t channel_index) +std::optional nano::transport::tcp_channels::sample_keepalive () { - nano::unique_lock lock{ mutex }; - std::optional keepalive; - size_t count = 0; - while (!keepalive && channels.size () > 0 && count++ < channels.size ()) + nano::lock_guard lock{ mutex }; + + auto next_rand = [this] (std::size_t max) { + std::uniform_int_distribution dist (0, max - 1); + return dist (rng); + }; + + size_t counter = 0; + while (counter++ < channels.size ()) { - ++channel_index; - if (channels.size () <= channel_index) - { - channel_index = 0; - } - auto server = channels.get ()[channel_index].response_server; - if (server && server->last_keepalive) + auto index = next_rand (channels.size ()); + if (auto server = channels.get ()[index].response_server) { - keepalive = std::move (server->last_keepalive); - server->last_keepalive = std::nullopt; - } - } - lock.unlock (); - if (keepalive) - { - ongoing_merge (channel_index, *keepalive, 1); - } - else - { - std::weak_ptr node_w = node.shared (); - node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index] () { - if (auto node_l = node_w.lock ()) + if (auto keepalive = server->pop_last_keepalive ()) { - if (!node_l->network.tcp_channels.stopped) - { - node_l->network.tcp_channels.ongoing_merge (channel_index); - } + return keepalive; } - }); + } } -} -void nano::transport::tcp_channels::ongoing_merge (size_t channel_index, nano::keepalive keepalive, size_t peer_index) -{ - debug_assert (peer_index < keepalive.peers.size ()); - node.network.merge_peer (keepalive.peers[peer_index++]); - if (peer_index < keepalive.peers.size ()) - { - std::weak_ptr node_w = node.shared (); - node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index, keepalive, peer_index] () { - if (auto node_l = node_w.lock ()) - { - if (!node_l->network.tcp_channels.stopped) - { - node_l->network.tcp_channels.ongoing_merge (channel_index, keepalive, peer_index); - } - } - }); - } - else - { - std::weak_ptr node_w = node.shared (); - node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index] () { - if (auto node_l = node_w.lock ()) - { - if (!node_l->network.tcp_channels.stopped) - { - node_l->network.tcp_channels.ongoing_merge (channel_index); - } - } - }); - } + return std::nullopt; } void nano::transport::tcp_channels::list (std::deque> & deque_a, uint8_t minimum_version_a, bool include_temporary_channels_a) diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index d49ef9d452..005ccb0429 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -154,12 +155,11 @@ namespace transport bool track_reachout (nano::endpoint const &); std::unique_ptr collect_container_info (std::string const &); void purge (std::chrono::steady_clock::time_point const &); - void ongoing_keepalive (); - void ongoing_merge (size_t channel_index); - void ongoing_merge (size_t channel_index, nano::keepalive keepalive, size_t peer_index); void list (std::deque> &, uint8_t = 0, bool = true); void modify (std::shared_ptr const &, std::function const &)>); void update (nano::tcp_endpoint const &); + std::optional sample_keepalive (); + // Connection start void start_tcp (nano::endpoint const &); void start_tcp_receive_node_id (std::shared_ptr const &, nano::endpoint const &, std::shared_ptr> const &); @@ -287,6 +287,8 @@ namespace transport nano::condition_variable condition; mutable nano::mutex mutex; std::thread keepalive_thread; + + std::default_random_engine rng; }; } // namespace transport } // namespace nano diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index edd7f9663a..bcad6d5b0b 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -796,6 +796,14 @@ void nano::transport::tcp_server::set_last_keepalive (nano::keepalive const & me } } +std::optional nano::transport::tcp_server::pop_last_keepalive () +{ + std::lock_guard lock{ mutex }; + auto result = last_keepalive; + last_keepalive = std::nullopt; + return result; +} + bool nano::transport::tcp_server::to_bootstrap_connection () { auto node = this->node.lock (); diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 5369258545..0760241164 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -63,6 +63,7 @@ class tcp_server final : public std::enable_shared_from_this void timeout (); void set_last_keepalive (nano::keepalive const & message); + std::optional pop_last_keepalive (); std::shared_ptr const socket; std::weak_ptr const node; @@ -73,7 +74,6 @@ class tcp_server final : public std::enable_shared_from_this nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 }; nano::account remote_node_id{}; std::chrono::steady_clock::time_point last_telemetry_req{}; - std::optional last_keepalive; private: void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2); @@ -90,9 +90,10 @@ class tcp_server final : public std::enable_shared_from_this bool is_bootstrap_connection () const; bool is_realtime_connection () const; +private: + bool const allow_bootstrap; std::shared_ptr message_deserializer; - - bool allow_bootstrap; + std::optional last_keepalive; private: class handshake_message_visitor : public nano::message_visitor