diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 9d1b6920ba..b718ea8568 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -240,11 +240,18 @@ enum class detail : uint8_t accept_error, accept_failure, accept_limits_exceeded, + attempts_limits_exceeded, + attempt_in_progress, close_error, max_per_ip, max_per_subnetwork, + max_attempts, excluded, erase_dead, + connect_initiate, + connect_failure, + connect_error, + attempt_timeout, // tcp_server handshake, diff --git a/nano/node/common.hpp b/nano/node/common.hpp index 9614826827..15a1cc9a44 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -14,6 +14,8 @@ namespace nano { using endpoint = boost::asio::ip::udp::endpoint; using tcp_endpoint = boost::asio::ip::tcp::endpoint; +using ip_address = boost::asio::ip::address; +using ip_port = uint16_t; bool parse_port (std::string const &, uint16_t &); bool parse_address (std::string const &, boost::asio::ip::address &); diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index eac61e5794..8181ccb901 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -464,8 +464,6 @@ void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point { nano::lock_guard lock{ mutex }; - node.logger.debug (nano::log::type::tcp_channels, "Performing periodic channel cleanup, cutoff: {}s", nano::log::seconds_delta (cutoff_deadline)); - auto should_close = [this, cutoff_deadline] (auto const & channel) { // Remove channels that haven't successfully sent a message within the cutoff time if (auto last = channel->get_last_packet_sent (); last < cutoff_deadline) diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp index 1f42278e66..7a606ba788 100644 --- a/nano/node/transport/tcp_listener.cpp +++ b/nano/node/transport/tcp_listener.cpp @@ -9,6 +9,8 @@ #include +#include + using namespace std::chrono_literals; /* @@ -21,6 +23,7 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_ logger{ node_a.logger }, port{ port_a }, max_inbound_connections{ max_inbound_connections }, + max_connection_attempts{ max_inbound_connections / 2 }, acceptor{ node_a.io_ctx } { connection_accepted.add ([this] (auto const & socket, auto const & server) { @@ -30,13 +33,14 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_ nano::transport::tcp_listener::~tcp_listener () { - // Thread should be stopped before destruction - debug_assert (!thread.joinable ()); + // Threads should be stopped before destruction + debug_assert (!listening_thread.joinable ()); + debug_assert (!cleanup_thread.joinable ()); } void nano::transport::tcp_listener::start () { - debug_assert (!thread.joinable ()); + debug_assert (!listening_thread.joinable ()); debug_assert (!cleanup_thread.joinable ()); try @@ -62,12 +66,12 @@ void nano::transport::tcp_listener::start () throw std::runtime_error (ex.code ().message ()); } - thread = std::thread ([this] { + listening_thread = std::thread ([this] { nano::thread_role::set (nano::thread_role::name::tcp_listener); try { logger.debug (nano::log::type::tcp_listener, "Starting acceptor thread"); - run (); + run_listening (); logger.debug (nano::log::type::tcp_listener, "Stopped acceptor thread"); } catch (std::exception const & ex) @@ -104,9 +108,9 @@ void nano::transport::tcp_listener::stop () } condition.notify_all (); - if (thread.joinable ()) + if (listening_thread.joinable ()) { - thread.join (); + listening_thread.join (); } if (cleanup_thread.joinable ()) { @@ -114,9 +118,19 @@ void nano::transport::tcp_listener::stop () } decltype (connections) connections_l; + decltype (attempts) attempts_l; { nano::lock_guard lock{ mutex }; + connections_l.swap (connections); + attempts_l.swap (attempts); + } + + // Cancel and await in-flight attempts + for (auto & attempt : attempts_l) + { + // TODO: Cancel attempt + attempt.future.wait (); } for (auto & connection : connections_l) @@ -139,6 +153,7 @@ void nano::transport::tcp_listener::run_cleanup () { stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::cleanup); cleanup (); + timeout (); condition.wait_for (lock, 1s, [this] () { return stopped.load (); }); } } @@ -159,9 +174,38 @@ void nano::transport::tcp_listener::cleanup () return false; } }); + + // Erase completed attempts + erase_if (attempts, [this] (auto const & attempt) { + return attempt.future.wait_for (0s) == std::future_status::ready; + }); } -void nano::transport::tcp_listener::run () +void nano::transport::tcp_listener::timeout () +{ + debug_assert (!mutex.try_lock ()); + + std::chrono::seconds const attempt_timeout = 15s; + auto cutoff = std::chrono::steady_clock::now () - attempt_timeout; + + // Cancel timed out attempts + for (auto & attempt : attempts) + { + if (auto status = attempt.future.wait_for (0s); status != std::future_status::ready) + { + if (attempt.start < cutoff) + { + // TODO: Cancel attempt + + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_timeout); + logger.debug (nano::log::type::tcp_listener, "Connection attempt timed out: {} (started {}s ago)", + fmt::streamed (attempt.endpoint), nano::log::seconds_delta (attempt.start)); + } + } + } +} + +void nano::transport::tcp_listener::run_listening () { nano::unique_lock lock{ mutex }; while (!stopped && acceptor.is_open ()) @@ -178,7 +222,8 @@ void nano::transport::tcp_listener::run () bool cooldown = false; try { - auto result = accept_one (); + auto raw_socket = accept_socket (); + auto result = accept_one (std::move (raw_socket), connection_type::inbound); if (result != accept_result::accepted) { stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure, nano::stat::dir::in); @@ -206,6 +251,54 @@ void nano::transport::tcp_listener::run () } } +bool nano::transport::tcp_listener::connect (nano::ip_address ip, nano::ip_port port) +{ + nano::unique_lock lock{ mutex }; + + if (port == 0) + { + port = node.network_params.network.default_node_port; + } + + if (auto result = check_limits (ip, connection_type::outbound); result != accept_result::accepted) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempts_limits_exceeded, nano::stat::dir::out); + // Refusal reason should be logged earlier + + return false; // Rejected + } + + nano::tcp_endpoint endpoint{ ip, port }; + + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_initiate, nano::stat::dir::out); + logger.debug (nano::log::type::tcp_listener, "Initiating outgoing connection to: {}", fmt::streamed (endpoint)); + + auto future = asio::co_spawn (node.io_ctx, connect_impl (endpoint), asio::use_future); + attempts.emplace (attempt_entry{ endpoint, std::move (future) }); + return true; // Attempt started +} + +auto nano::transport::tcp_listener::connect_impl (nano::tcp_endpoint endpoint) -> asio::awaitable +{ + try + { + asio::ip::tcp::socket raw_socket{ node.io_ctx }; + co_await raw_socket.async_connect (endpoint, asio::use_awaitable); + + auto result = accept_one (std::move (raw_socket), connection_type::outbound); + if (result != accept_result::accepted) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_failure, nano::stat::dir::out); + // Refusal reason should be logged earlier + } + } + catch (boost::system::system_error const & ex) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_error, nano::stat::dir::out); + logger.log (nano::log::level::debug, nano::log::type::tcp_listener, "Error connecting to: {} ({})", fmt::streamed (endpoint), ex.what ()); + } +} + asio::ip::tcp::socket nano::transport::tcp_listener::accept_socket () { std::future future; @@ -217,15 +310,16 @@ asio::ip::tcp::socket nano::transport::tcp_listener::accept_socket () return future.get (); } -auto nano::transport::tcp_listener::accept_one () -> accept_result +auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket, connection_type type) -> accept_result { - auto raw_socket = accept_socket (); auto const remote_endpoint = raw_socket.remote_endpoint (); auto const local_endpoint = raw_socket.local_endpoint (); - if (auto result = check_limits (remote_endpoint.address ()); result != accept_result::accepted) + nano::unique_lock lock{ mutex }; + + if (auto result = check_limits (remote_endpoint.address (), connection_type::inbound); result != accept_result::accepted) { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_limits_exceeded, nano::stat::dir::in); + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_limits_exceeded, to_stat_dir (type)); // Refusal reason should be logged earlier try @@ -236,23 +330,22 @@ auto nano::transport::tcp_listener::accept_one () -> accept_result } catch (boost::system::system_error const & ex) { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::close_error, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Error while closing socket after refusing connection: {}", ex.what ()); + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::close_error, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Error while closing socket after refusing connection: {} ({})", ex.what (), to_string (type)); } return result; } - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Accepted incoming connection from: {}", fmt::streamed (remote_endpoint)); + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Accepted connection to: {} ({})", fmt::streamed (remote_endpoint), to_string (type)); - auto socket = std::make_shared (node, std::move (raw_socket), remote_endpoint, local_endpoint, socket_endpoint::server); + auto socket = std::make_shared (node, std::move (raw_socket), remote_endpoint, local_endpoint, to_socket_type (type)); auto server = std::make_shared (socket, node.shared (), true); - { - nano::lock_guard lock{ mutex }; - connections.emplace (entry{ remote_endpoint, socket, server }); - } + connections.emplace (entry{ remote_endpoint, socket, server }); + + lock.unlock (); socket->set_timeout (node.network_params.network.idle_timeout); socket->start (); @@ -278,29 +371,31 @@ void nano::transport::tcp_listener::wait_available_slots () } } -auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip) -> accept_result +auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, connection_type type) const -> accept_result { - nano::lock_guard lock{ mutex }; - - cleanup (); + debug_assert (!mutex.try_lock ()); + // TODO: FIX debug_assert (connections.size () <= max_inbound_connections); // Should be checked earlier (wait_available_slots) - if (node.network.excluded_peers.check (ip)) // true => error { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::excluded, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Rejected connection from excluded peer: {}", ip.to_string ()); + if (node.network.excluded_peers.check (ip)) // true => error + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::excluded, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Rejected connection from excluded peer: {} ({})", + ip.to_string (), to_string (type)); - return accept_result::excluded; + return accept_result::excluded; + } } if (!node.flags.disable_max_peers_per_ip) { if (auto count = count_per_ip (ip); count >= node.network_params.network.max_peers_per_ip) { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_ip, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached (ip: {}, count: {}), unable to open new connection", - ip.to_string (), count); + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_ip, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached (ip: {}, count: {}), unable to open new connection ({})", + ip.to_string (), count, to_string (type)); return accept_result::too_many_per_ip; } @@ -311,14 +406,35 @@ auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip) { if (auto count = count_per_subnetwork (ip); count >= node.network_params.network.max_peers_per_subnetwork) { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_subnetwork, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (ip: {}, count: {}), unable to open new connection", - ip.to_string (), count); + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_subnetwork, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (ip: {}, count: {}), unable to open new connection ({})", + ip.to_string (), count, to_string (type)); return accept_result::too_many_per_subnetwork; } } + if (type == connection_type::outbound) + { + if (attempts.size () > max_connection_attempts) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_attempts, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Max total connection attempts reached (ip: {}, count: {}), unable to initiate new connection ({})", + ip.to_string (), attempts.size (), to_string (type)); + + return accept_result::too_many_attempts; + } + + if (auto count = count_per_attempt (ip); count >= 1) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_in_progress, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Connection attempt already in progress (ip: {}), unable to initiate new connection ({})", + ip.to_string (), to_string (type)); + + return accept_result::too_many_attempts; + } + } + return accept_result::accepted; } @@ -354,7 +470,7 @@ size_t nano::transport::tcp_listener::bootstrap_count () const }); } -size_t nano::transport::tcp_listener::count_per_ip (asio::ip::address const & ip) const +size_t nano::transport::tcp_listener::count_per_ip (nano::ip_address const & ip) const { debug_assert (!mutex.try_lock ()); @@ -363,7 +479,7 @@ size_t nano::transport::tcp_listener::count_per_ip (asio::ip::address const & ip }); } -size_t nano::transport::tcp_listener::count_per_subnetwork (asio::ip::address const & ip) const +size_t nano::transport::tcp_listener::count_per_subnetwork (nano::ip_address const & ip) const { debug_assert (!mutex.try_lock ()); @@ -372,6 +488,15 @@ size_t nano::transport::tcp_listener::count_per_subnetwork (asio::ip::address co }); } +size_t nano::transport::tcp_listener::count_per_attempt (nano::ip_address const & ip) const +{ + debug_assert (!mutex.try_lock ()); + + return std::count_if (attempts.begin (), attempts.end (), [&ip] (auto const & attempt) { + return nano::transport::is_same_ip (attempt.address (), ip); + }); +} + asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const { if (!stopped) @@ -390,4 +515,35 @@ std::unique_ptr nano::transport::tcp_listener::c auto composite = std::make_unique (name); composite->add_component (std::make_unique (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) })); return composite; -} \ No newline at end of file +} + +nano::stat::dir nano::transport::tcp_listener::to_stat_dir (connection_type type) +{ + switch (type) + { + case connection_type::inbound: + return nano::stat::dir::in; + case connection_type::outbound: + return nano::stat::dir::out; + } + debug_assert (false); + return {}; +} + +std::string_view nano::transport::tcp_listener::to_string (connection_type type) +{ + return magic_enum::enum_name (type); +} + +nano::transport::socket_endpoint nano::transport::tcp_listener::to_socket_type (connection_type type) +{ + switch (type) + { + case connection_type::inbound: + return socket_endpoint::server; + case connection_type::outbound: + return socket_endpoint::client; + } + debug_assert (false); + return {}; +} diff --git a/nano/node/transport/tcp_listener.hpp b/nano/node/transport/tcp_listener.hpp index 5bad7a7dd4..1df7c52d4c 100644 --- a/nano/node/transport/tcp_listener.hpp +++ b/nano/node/transport/tcp_listener.hpp @@ -1,12 +1,15 @@ #pragma once #include +#include +#include #include #include #include #include +#include #include #include @@ -37,6 +40,12 @@ class tcp_listener final void start (); void stop (); + /** + * @param port is optional, if 0 then default peering port is used + * @return true if connection attempt was initiated + */ + bool connect (nano::ip_address ip, nano::ip_port port = 0); + nano::tcp_endpoint endpoint () const; size_t connection_count () const; size_t realtime_count () const; @@ -54,9 +63,10 @@ class tcp_listener final nano::logger & logger; private: - void run (); + void run_listening (); void run_cleanup (); void cleanup (); + void timeout (); void wait_available_slots (); enum class accept_result @@ -66,23 +76,49 @@ class tcp_listener final too_many_per_ip, too_many_per_subnetwork, excluded, + too_many_attempts, }; - accept_result accept_one (); - accept_result check_limits (asio::ip::address const & ip); + enum class connection_type + { + inbound, + outbound, + }; + + accept_result accept_one (asio::ip::tcp::socket, connection_type); + accept_result check_limits (nano::ip_address const & ip, connection_type) const; asio::ip::tcp::socket accept_socket (); + asio::awaitable connect_impl (nano::tcp_endpoint); + + size_t count_per_ip (nano::ip_address const & ip) const; + size_t count_per_subnetwork (nano::ip_address const & ip) const; + size_t count_per_attempt (nano::ip_address const & ip) const; - size_t count_per_ip (asio::ip::address const & ip) const; - size_t count_per_subnetwork (asio::ip::address const & ip) const; + static nano::stat::dir to_stat_dir (connection_type); + static std::string_view to_string (connection_type); + static nano::transport::socket_endpoint to_socket_type (connection_type); private: struct entry { - asio::ip::tcp::endpoint endpoint; + nano::tcp_endpoint endpoint; std::weak_ptr socket; std::weak_ptr server; - asio::ip::address address () const + nano::ip_address address () const + { + return endpoint.address (); + } + }; + + struct attempt_entry + { + nano::tcp_endpoint endpoint; + std::future future; + + std::chrono::steady_clock::time_point start{ std::chrono::steady_clock::now () }; + + nano::ip_address address () const { return endpoint.address (); } @@ -91,6 +127,7 @@ class tcp_listener final private: uint16_t const port; std::size_t const max_inbound_connections; + size_t const max_connection_attempts; // clang-format off class tag_address {}; @@ -98,10 +135,18 @@ class tcp_listener final using ordered_connections = boost::multi_index_container, - mi::const_mem_fun> + mi::const_mem_fun> + >>; + + using ordered_attempts = boost::multi_index_container, + mi::const_mem_fun> >>; // clang-format on + ordered_connections connections; + ordered_attempts attempts; asio::ip::tcp::acceptor acceptor; asio::ip::tcp::endpoint local; @@ -109,7 +154,7 @@ class tcp_listener final std::atomic stopped; nano::condition_variable condition; mutable nano::mutex mutex; - std::thread thread; + std::thread listening_thread; std::thread cleanup_thread; }; } \ No newline at end of file