From 030ed21a45ab73d0ebdf0e697d34992dc5671f9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 19 Mar 2024 11:00:11 +0100 Subject: [PATCH] Reorganize `tcp_listener` --- nano/core_test/network.cpp | 15 +++-------- nano/node/node.cpp | 2 +- nano/node/transport/tcp_server.cpp | 40 +++++++++++++++++------------ nano/node/transport/tcp_server.hpp | 41 +++++++++++++++++++----------- 4 files changed, 54 insertions(+), 44 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index fe8bcfb6fb..229a24cc35 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -698,10 +698,7 @@ TEST (tcp_listener, DISABLED_tcp_listener_timeout_empty) system.deadline_set (std::chrono::seconds (6)); while (!disconnected) { - { - nano::lock_guard guard (node0->tcp_listener->mutex); - disconnected = node0->tcp_listener->connections.empty (); - } + disconnected = node0->tcp_listener->connection_count () == 0; ASSERT_NO_ERROR (system.poll ()); } } @@ -723,18 +720,12 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake) }); }); ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake) != 0); - { - nano::lock_guard guard (node0->tcp_listener->mutex); - ASSERT_EQ (node0->tcp_listener->connections.size (), 1); - } + ASSERT_EQ (node0->tcp_listener->connection_count (), 1); bool disconnected (false); system.deadline_set (std::chrono::seconds (20)); while (!disconnected) { - { - nano::lock_guard guard (node0->tcp_listener->mutex); - disconnected = node0->tcp_listener->connections.empty (); - } + disconnected = node0->tcp_listener->connection_count () == 0; ASSERT_NO_ERROR (system.poll ()); } } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 054e3669f7..a1bf1533b9 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -519,7 +519,7 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.ledger, "ledger")); composite->add_component (collect_container_info (node.active, "active")); composite->add_component (collect_container_info (node.bootstrap_initiator, "bootstrap_initiator")); - composite->add_component (collect_container_info (*node.tcp_listener, "tcp_listener")); + composite->add_component (node.tcp_listener->collect_container_info ("tcp_listener")); composite->add_component (collect_container_info (node.network, "network")); composite->add_component (node.telemetry.collect_container_info ("telemetry")); composite->add_component (collect_container_info (node.workers, "workers")); diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index a5fd6751d0..f0e753814f 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -42,10 +42,14 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_ { } +nano::transport::tcp_listener::~tcp_listener () +{ +} + void nano::transport::tcp_listener::start (std::function const &, boost::system::error_code const &)> callback_a) { nano::lock_guard lock{ mutex }; - on = true; + acceptor.open (local.protocol ()); acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); boost::system::error_code ec; @@ -68,12 +72,14 @@ void nano::transport::tcp_listener::stop () decltype (connections) connections_l; { nano::lock_guard lock{ mutex }; - on = false; + stopped = true; connections_l.swap (connections); } + nano::lock_guard lock{ mutex }; - boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this ()] () { + boost::asio::dispatch (strand, [this_l = shared_from_this ()] () { this_l->acceptor.close (); + for (auto & address_connection_pair : this_l->connections_per_address) { if (auto connection_l = address_connection_pair.second.lock ()) @@ -82,15 +88,25 @@ void nano::transport::tcp_listener::stop () } } this_l->connections_per_address.clear (); - })); + }); } std::size_t nano::transport::tcp_listener::connection_count () { nano::lock_guard lock{ mutex }; + cleanup (); return connections.size (); } +void nano::transport::tcp_listener::cleanup () +{ + debug_assert (!mutex.try_lock ()); + + erase_if (connections, [] (auto const & connection) { + return connection.second.expired (); + }); +} + bool nano::transport::tcp_listener::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection) { debug_assert (strand.running_in_this_thread ()); @@ -253,10 +269,10 @@ void nano::transport::tcp_listener::accept_action (boost::system::error_code con } } -boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () +boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const { nano::lock_guard lock{ mutex }; - if (on) + if (!stopped) { return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port ()); } @@ -266,11 +282,10 @@ boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () } } -std::unique_ptr nano::transport::collect_container_info (nano::transport::tcp_listener & bootstrap_listener, std::string const & name) +std::unique_ptr nano::transport::tcp_listener::collect_container_info (std::string const & name) { - auto sizeof_element = sizeof (decltype (bootstrap_listener.connections)::value_type); auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "connections", bootstrap_listener.connection_count (), sizeof_element })); + composite->add_component (std::make_unique (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) })); return composite; } @@ -321,9 +336,6 @@ nano::transport::tcp_server::~tcp_server () } stop (); - - nano::lock_guard lock{ node->tcp_listener->mutex }; - node->tcp_listener->connections.erase (this); } void nano::transport::tcp_server::start () @@ -877,10 +889,6 @@ void nano::transport::tcp_server::timeout () { node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", fmt::streamed (remote_endpoint)); - { - nano::lock_guard lock{ node->tcp_listener->mutex }; - node->tcp_listener->connections.erase (this); - } socket->close (); } } diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 67abf9745e..64e2bc781f 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -22,35 +22,46 @@ class tcp_server; class tcp_listener final : public std::enable_shared_from_this { public: - tcp_listener (uint16_t, nano::node &, std::size_t); - void start (std::function const &, boost::system::error_code const &)> callback_a); + tcp_listener (uint16_t port, nano::node &, std::size_t max_inbound_connections); + ~tcp_listener (); + + void start (std::function const &, boost::system::error_code const &)> callback); void stop (); + void accept_action (boost::system::error_code const &, std::shared_ptr const &); + std::size_t connection_count (); + nano::tcp_endpoint endpoint () const; - nano::mutex mutex; - std::unordered_map> connections; - nano::tcp_endpoint endpoint (); + std::unique_ptr collect_container_info (std::string const & name); + +private: // Dependencies nano::node & node; - bool on{ false }; - std::atomic bootstrap_count{ 0 }; - std::atomic realtime_count{ 0 }; private: - boost::asio::strand strand; - nano::transport::address_socket_mmap connections_per_address; - boost::asio::ip::tcp::acceptor acceptor; - boost::asio::ip::tcp::endpoint local; - std::size_t max_inbound_connections; void on_connection (std::function const &, boost::system::error_code const &)> callback_a); void evict_dead_connections (); void on_connection_requeue_delayed (std::function const & new_connection, boost::system::error_code const &)>); /** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */ bool limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection); bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection); -}; + void cleanup (); -std::unique_ptr collect_container_info (tcp_listener & bootstrap_listener, std::string const & name); +public: + std::atomic bootstrap_count{ 0 }; + std::atomic realtime_count{ 0 }; + +private: + std::unordered_map> connections; + boost::asio::strand strand; + nano::transport::address_socket_mmap connections_per_address; + boost::asio::ip::tcp::acceptor acceptor; + boost::asio::ip::tcp::endpoint local; + std::size_t const max_inbound_connections; + + std::atomic stopped; + mutable nano::mutex mutex; +}; class tcp_server final : public std::enable_shared_from_this {