diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 9d1b6920ba..b718ea8568 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -240,11 +240,18 @@ enum class detail : uint8_t accept_error, accept_failure, accept_limits_exceeded, + attempts_limits_exceeded, + attempt_in_progress, close_error, max_per_ip, max_per_subnetwork, + max_attempts, excluded, erase_dead, + connect_initiate, + connect_failure, + connect_error, + attempt_timeout, // tcp_server handshake, diff --git a/nano/node/common.hpp b/nano/node/common.hpp index 9614826827..15a1cc9a44 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -14,6 +14,8 @@ namespace nano { using endpoint = boost::asio::ip::udp::endpoint; using tcp_endpoint = boost::asio::ip::tcp::endpoint; +using ip_address = boost::asio::ip::address; +using ip_port = uint16_t; bool parse_port (std::string const &, uint16_t &); bool parse_address (std::string const &, boost::asio::ip::address &); diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp index a4358ac51a..db5038d371 100644 --- a/nano/node/transport/tcp_listener.cpp +++ b/nano/node/transport/tcp_listener.cpp @@ -9,6 +9,8 @@ #include +#include + using namespace std::chrono_literals; /* @@ -21,6 +23,7 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_ logger{ node_a.logger }, port{ port_a }, max_inbound_connections{ max_inbound_connections }, + max_connection_attempts{ max_inbound_connections / 2 }, acceptor{ node_a.io_ctx }, local{ boost::asio::ip::tcp::endpoint{ boost::asio::ip::address_v6::any (), port_a } } { @@ -31,13 +34,14 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_ nano::transport::tcp_listener::~tcp_listener () { - // Thread should be stopped before destruction - debug_assert (!thread.joinable ()); + // Threads should be stopped before destruction + debug_assert (!listening_thread.joinable ()); + debug_assert (!cleanup_thread.joinable ()); } void nano::transport::tcp_listener::start (std::function const &, boost::system::error_code const &)> callback_a) { - debug_assert (!thread.joinable ()); + debug_assert (!listening_thread.joinable ()); debug_assert (!cleanup_thread.joinable ()); try @@ -56,12 +60,12 @@ void nano::transport::tcp_listener::start (std::function lock{ mutex }; + connections_l.swap (connections); + attempts_l.swap (attempts); + } + + // Cancel and await in-flight attempts + for (auto & attempt : attempts_l) + { + // TODO: Cancel attempt + attempt.future.wait (); } for (auto & connection : connections_l) @@ -133,6 +147,7 @@ void nano::transport::tcp_listener::run_cleanup () { stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::cleanup); cleanup (); + timeout (); condition.wait_for (lock, 1s, [this] () { return stopped.load (); }); } } @@ -153,9 +168,35 @@ void nano::transport::tcp_listener::cleanup () return false; } }); + + // Erase completed attempts + erase_if (attempts, [this] (auto const & attempt) { + return attempt.future.wait_for (0s) == std::future_status::ready; + }); } -void nano::transport::tcp_listener::run () +void nano::transport::tcp_listener::timeout () +{ + debug_assert (!mutex.try_lock ()); + + std::chrono::seconds const attempt_timeout = 15s; + auto cutoff = std::chrono::steady_clock::now () - attempt_timeout; + + erase_if (attempts, [this, cutoff] (auto const & attempt) { + if (auto status = attempt.future.wait_for (0s); status != std::future_status::ready) + { + if (attempt.start < cutoff) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_timeout); + logger.debug (nano::log::type::tcp_listener, "Connection attempt timed out: {} ", fmt::streamed (attempt.endpoint)); + return true; + } + } + return false; + }); +} + +void nano::transport::tcp_listener::run_listening () { nano::unique_lock lock{ mutex }; while (!stopped && acceptor.is_open ()) @@ -172,7 +213,8 @@ void nano::transport::tcp_listener::run () bool cooldown = false; try { - auto result = accept_one (); + auto raw_socket = accept_socket (); + auto result = accept_one (std::move (raw_socket), connection_type::inbound); if (result != accept_result::accepted) { stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure, nano::stat::dir::in); @@ -200,6 +242,59 @@ void nano::transport::tcp_listener::run () } } +bool nano::transport::tcp_listener::connect (nano::ip_address ip, nano::ip_port port) +{ + nano::unique_lock lock{ mutex }; + + if (port == 0) + { + port = node.network_params.network.default_node_port; + } + + if (auto result = check_limits (ip, connection_type::outbound); result != accept_result::accepted) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempts_limits_exceeded, nano::stat::dir::out); + // Refusal reason should be logged earlier + + return false; // Rejected + } + + nano::tcp_endpoint endpoint{ ip, port }; + + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_initiate, nano::stat::dir::out); + logger.debug (nano::log::type::tcp_listener, "Initiating outgoing connection to: {}", fmt::streamed (endpoint)); + + auto future = boost::asio::co_spawn ( + node.io_ctx, + [this, endpoint] () -> boost::asio::awaitable { co_await connect_impl (endpoint); }, + boost::asio::use_future); + + attempts.emplace (attempt_entry{ endpoint, std::move (future) }); + + return true; // Attempt started +} + +auto nano::transport::tcp_listener::connect_impl (nano::tcp_endpoint endpoint) -> boost::asio::awaitable +{ + try + { + boost::asio::ip::tcp::socket raw_socket{ node.io_ctx }; + co_await raw_socket.async_connect (endpoint, boost::asio::use_awaitable); + + auto result = accept_one (std::move (raw_socket), connection_type::outbound); + if (result != accept_result::accepted) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_failure, nano::stat::dir::out); + // Refusal reason should be logged earlier + } + } + catch (boost::system::system_error const & ex) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_error, nano::stat::dir::out); + logger.log (nano::log::level::debug, nano::log::type::tcp_listener, "Error connecting to: {} ({})", fmt::streamed (endpoint), ex.what ()); + } +} + boost::asio::ip::tcp::socket nano::transport::tcp_listener::accept_socket () { std::future future; @@ -211,15 +306,17 @@ boost::asio::ip::tcp::socket nano::transport::tcp_listener::accept_socket () return future.get (); } -auto nano::transport::tcp_listener::accept_one () -> accept_result +// Closes raw socket if connection is refused +auto nano::transport::tcp_listener::accept_one (boost::asio::ip::tcp::socket raw_socket, connection_type type) -> accept_result { - auto raw_socket = accept_socket (); auto const remote_endpoint = raw_socket.remote_endpoint (); auto const local_endpoint = raw_socket.local_endpoint (); - if (auto result = check_limits (remote_endpoint.address ()); result != accept_result::accepted) + nano::unique_lock lock{ mutex }; + + if (auto result = check_limits (remote_endpoint.address (), connection_type::inbound); result != accept_result::accepted) { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_limits_exceeded, nano::stat::dir::in); + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_limits_exceeded, to_stat_dir (type)); // Refusal reason should be logged earlier try @@ -230,23 +327,22 @@ auto nano::transport::tcp_listener::accept_one () -> accept_result } catch (boost::system::system_error const & ex) { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::close_error, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Error while closing socket after refusing connection: {}", ex.what ()); + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::close_error, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Error while closing socket after refusing connection: {} ({})", ex.what (), to_string (type)); } return result; } - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Accepted incoming connection from: {}", fmt::streamed (remote_endpoint)); + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Accepted connection to: {} ({})", fmt::streamed (remote_endpoint), to_string (type)); - auto socket = std::make_shared (std::move (raw_socket), remote_endpoint, local_endpoint, node, socket_endpoint::server); + auto socket = std::make_shared (std::move (raw_socket), remote_endpoint, local_endpoint, node, to_socket_type (type)); auto server = std::make_shared (socket, node.shared (), true); - { - nano::lock_guard lock{ mutex }; - connections.emplace (entry{ remote_endpoint, socket, server }); - } + connections.emplace (entry{ remote_endpoint, socket, server }); + + lock.unlock (); socket->set_timeout (node.network_params.network.idle_timeout); socket->start (); @@ -272,29 +368,31 @@ void nano::transport::tcp_listener::wait_available_slots () } } -auto nano::transport::tcp_listener::check_limits (boost::asio::ip::address const & ip) -> accept_result +auto nano::transport::tcp_listener::check_limits (boost::asio::ip::address const & ip, connection_type type) const -> accept_result { - nano::lock_guard lock{ mutex }; - - cleanup (); + debug_assert (!mutex.try_lock ()); + // TODO: FIX debug_assert (connections.size () <= max_inbound_connections); // Should be checked earlier (wait_available_slots) - if (node.network.excluded_peers.check (ip)) // true => error { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::excluded, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Rejected connection from excluded peer: {}", ip.to_string ()); + if (node.network.excluded_peers.check (ip)) // true => error + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::excluded, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Rejected connection from excluded peer: {} ({})", + ip.to_string (), to_string (type)); - return accept_result::excluded; + return accept_result::excluded; + } } if (!node.flags.disable_max_peers_per_ip) { if (auto count = count_per_ip (ip); count >= node.network_params.network.max_peers_per_ip) { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_ip, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached (ip: {}, count: {}), unable to open new connection", - ip.to_string (), count); + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_ip, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached (ip: {}, count: {}), unable to open new connection ({})", + ip.to_string (), count, to_string (type)); return accept_result::too_many_per_ip; } @@ -305,14 +403,35 @@ auto nano::transport::tcp_listener::check_limits (boost::asio::ip::address const { if (auto count = count_per_subnetwork (ip); count >= node.network_params.network.max_peers_per_subnetwork) { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_subnetwork, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (ip: {}, count: {}), unable to open new connection", - ip.to_string (), count); + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_subnetwork, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (ip: {}, count: {}), unable to open new connection ({})", + ip.to_string (), count, to_string (type)); return accept_result::too_many_per_subnetwork; } } + if (type == connection_type::outbound) + { + if (attempts.size () > max_connection_attempts) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_attempts, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Max total connection attempts reached (ip: {}, count: {}), unable to initiate new connection ({})", + ip.to_string (), attempts.size (), to_string (type)); + + return accept_result::too_many_attempts; + } + + if (auto count = count_per_attempt (ip); count >= 1) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_in_progress, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Connection attempt already in progress (ip: {}), unable to initiate new connection ({})", + ip.to_string (), to_string (type)); + + return accept_result::too_many_attempts; + } + } + return accept_result::accepted; } @@ -366,6 +485,15 @@ size_t nano::transport::tcp_listener::count_per_subnetwork (boost::asio::ip::add }); } +size_t nano::transport::tcp_listener::count_per_attempt (boost::asio::ip::address const & ip) const +{ + debug_assert (!mutex.try_lock ()); + + return std::count_if (attempts.begin (), attempts.end (), [&ip] (auto const & attempt) { + return nano::transport::is_same_ip (attempt.address (), ip); + }); +} + boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const { if (!stopped) @@ -383,4 +511,35 @@ std::unique_ptr nano::transport::tcp_listener::c auto composite = std::make_unique (name); composite->add_component (std::make_unique (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) })); return composite; -} \ No newline at end of file +} + +nano::stat::dir nano::transport::tcp_listener::to_stat_dir (connection_type type) +{ + switch (type) + { + case connection_type::inbound: + return nano::stat::dir::in; + case connection_type::outbound: + return nano::stat::dir::out; + } + debug_assert (false); + return {}; +} + +std::string_view nano::transport::tcp_listener::to_string (connection_type type) +{ + return magic_enum::enum_name (type); +} + +nano::transport::socket_endpoint nano::transport::tcp_listener::to_socket_type (connection_type type) +{ + switch (type) + { + case connection_type::inbound: + return socket_endpoint::server; + case connection_type::outbound: + return socket_endpoint::client; + } + debug_assert (false); + return {}; +} diff --git a/nano/node/transport/tcp_listener.hpp b/nano/node/transport/tcp_listener.hpp index 4111992842..6a02ffdd48 100644 --- a/nano/node/transport/tcp_listener.hpp +++ b/nano/node/transport/tcp_listener.hpp @@ -1,12 +1,15 @@ #pragma once #include +#include +#include #include #include #include #include +#include #include #include @@ -36,6 +39,9 @@ class tcp_listener final void start (std::function const &, boost::system::error_code const &)> callback = {}); void stop (); + /// @param port is optional, if 0 then default peering port is used + bool connect (nano::ip_address ip, nano::ip_port port = 0); + nano::tcp_endpoint endpoint () const; size_t connection_count () const; size_t realtime_count () const; @@ -53,9 +59,10 @@ class tcp_listener final nano::logger & logger; private: - void run (); + void run_listening (); void run_cleanup (); void cleanup (); + void timeout (); void wait_available_slots (); enum class accept_result @@ -65,23 +72,49 @@ class tcp_listener final too_many_per_ip, too_many_per_subnetwork, excluded, + too_many_attempts, }; - accept_result accept_one (); - accept_result check_limits (boost::asio::ip::address const & ip); + enum class connection_type + { + inbound, + outbound, + }; + + accept_result accept_one (boost::asio::ip::tcp::socket, connection_type); + accept_result check_limits (nano::ip_address const & ip, connection_type) const; boost::asio::ip::tcp::socket accept_socket (); + boost::asio::awaitable connect_impl (nano::tcp_endpoint); + + size_t count_per_ip (nano::ip_address const & ip) const; + size_t count_per_subnetwork (nano::ip_address const & ip) const; + size_t count_per_attempt (nano::ip_address const & ip) const; - size_t count_per_ip (boost::asio::ip::address const & ip) const; - size_t count_per_subnetwork (boost::asio::ip::address const & ip) const; + static nano::stat::dir to_stat_dir (connection_type); + static std::string_view to_string (connection_type); + static nano::transport::socket_endpoint to_socket_type (connection_type); private: struct entry { - boost::asio::ip::tcp::endpoint endpoint; + nano::tcp_endpoint endpoint; std::weak_ptr socket; std::weak_ptr server; - boost::asio::ip::address address () const + nano::ip_address address () const + { + return endpoint.address (); + } + }; + + struct attempt_entry + { + nano::tcp_endpoint endpoint; + std::future future; + + std::chrono::steady_clock::time_point start{ std::chrono::steady_clock::now () }; + + nano::ip_address address () const { return endpoint.address (); } @@ -90,6 +123,7 @@ class tcp_listener final private: uint16_t const port; std::size_t const max_inbound_connections; + size_t const max_connection_attempts; // clang-format off class tag_address {}; @@ -97,10 +131,18 @@ class tcp_listener final using ordered_connections = boost::multi_index_container, - mi::const_mem_fun> + mi::const_mem_fun> + >>; + + using ordered_attempts = boost::multi_index_container, + mi::const_mem_fun> >>; // clang-format on + ordered_connections connections; + ordered_attempts attempts; boost::asio::ip::tcp::acceptor acceptor; boost::asio::ip::tcp::endpoint local; @@ -108,7 +150,7 @@ class tcp_listener final std::atomic stopped; nano::condition_variable condition; mutable nano::mutex mutex; - std::thread thread; + std::thread listening_thread; std::thread cleanup_thread; }; } \ No newline at end of file