From 9883515a86cef2e3a809f27582606e18c6d132df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 1 Apr 2024 14:57:18 +0200 Subject: [PATCH] WIP --- nano/node/transport/tcp_listener.cpp | 49 +++++++++++++++++++++++----- nano/node/transport/tcp_listener.hpp | 26 +++++++++------ 2 files changed, 57 insertions(+), 18 deletions(-) diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp index 7a606ba788..02997eab40 100644 --- a/nano/node/transport/tcp_listener.cpp +++ b/nano/node/transport/tcp_listener.cpp @@ -21,6 +21,7 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_ node{ node_a }, stats{ node_a.stats }, logger{ node_a.logger }, + strand{ node.io_ctx.get_executor () }, port{ port_a }, max_inbound_connections{ max_inbound_connections }, max_connection_attempts{ max_inbound_connections / 2 }, @@ -121,18 +122,26 @@ void nano::transport::tcp_listener::stop () decltype (attempts) attempts_l; { nano::lock_guard lock{ mutex }; - connections_l.swap (connections); attempts_l.swap (attempts); } - // Cancel and await in-flight attempts + // Cancel all in-flight attempts + asio::post (strand, asio::use_future ([&attempts_l] () { + for (auto & attempt : attempts_l) + { + attempt.cancellation_signal.emit (asio::cancellation_type::terminal); + } + })) + .wait (); + + // Wait for all attempts to complete for (auto & attempt : attempts_l) { - // TODO: Cancel attempt attempt.future.wait (); } + // Gracefully close all connections for (auto & connection : connections_l) { if (auto socket = connection.socket.lock ()) @@ -144,6 +153,10 @@ void nano::transport::tcp_listener::stop () server->stop (); } } + + // No new connections should be accepted or initiated in the meantime + debug_assert (connection_count () == 0); + debug_assert (attempt_count () == 0); } void nano::transport::tcp_listener::run_cleanup () @@ -195,11 +208,14 @@ void nano::transport::tcp_listener::timeout () { if (attempt.start < cutoff) { - // TODO: Cancel attempt - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_timeout); logger.debug (nano::log::type::tcp_listener, "Connection attempt timed out: {} (started {}s ago)", fmt::streamed (attempt.endpoint), nano::log::seconds_delta (attempt.start)); + + asio::post (strand, asio::use_future ([&attempt] () { + attempt.cancellation_signal.emit (asio::cancellation_type::terminal); + })) + .wait (); } } } @@ -255,6 +271,11 @@ bool nano::transport::tcp_listener::connect (nano::ip_address ip, nano::ip_port { nano::unique_lock lock{ mutex }; + if (stopped) + { + return false; // Rejected + } + if (port == 0) { port = node.network_params.network.default_node_port; @@ -274,12 +295,13 @@ bool nano::transport::tcp_listener::connect (nano::ip_address ip, nano::ip_port logger.debug (nano::log::type::tcp_listener, "Initiating outgoing connection to: {}", fmt::streamed (endpoint)); auto future = asio::co_spawn (node.io_ctx, connect_impl (endpoint), asio::use_future); - attempts.emplace (attempt_entry{ endpoint, std::move (future) }); + attempts.emplace_back (endpoint, std::move (future)); return true; // Attempt started } auto nano::transport::tcp_listener::connect_impl (nano::tcp_endpoint endpoint) -> asio::awaitable { + debug_assert (strand.running_in_this_thread ()); try { asio::ip::tcp::socket raw_socket{ node.io_ctx }; @@ -317,6 +339,11 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket nano::unique_lock lock{ mutex }; + if (stopped) + { + return accept_result::invalid; + } + if (auto result = check_limits (remote_endpoint.address (), connection_type::inbound); result != accept_result::accepted) { stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_limits_exceeded, to_stat_dir (type)); @@ -425,7 +452,7 @@ auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, return accept_result::too_many_attempts; } - if (auto count = count_per_attempt (ip); count >= 1) + if (auto count = count_attempts (ip); count >= max_attempts_per_ip) { stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_in_progress, to_stat_dir (type)); logger.debug (nano::log::type::tcp_listener, "Connection attempt already in progress (ip: {}), unable to initiate new connection ({})", @@ -444,6 +471,12 @@ size_t nano::transport::tcp_listener::connection_count () const return connections.size (); } +size_t nano::transport::tcp_listener::attempt_count () const +{ + nano::lock_guard lock{ mutex }; + return attempts.size (); +} + size_t nano::transport::tcp_listener::realtime_count () const { nano::lock_guard lock{ mutex }; @@ -488,7 +521,7 @@ size_t nano::transport::tcp_listener::count_per_subnetwork (nano::ip_address con }); } -size_t nano::transport::tcp_listener::count_per_attempt (nano::ip_address const & ip) const +size_t nano::transport::tcp_listener::count_attempts (nano::ip_address const & ip) const { debug_assert (!mutex.try_lock ()); diff --git a/nano/node/transport/tcp_listener.hpp b/nano/node/transport/tcp_listener.hpp index 1df7c52d4c..d5961bef28 100644 --- a/nano/node/transport/tcp_listener.hpp +++ b/nano/node/transport/tcp_listener.hpp @@ -10,6 +10,7 @@ #include #include +#include #include #include @@ -48,6 +49,7 @@ class tcp_listener final nano::tcp_endpoint endpoint () const; size_t connection_count () const; + size_t attempt_count () const; size_t realtime_count () const; size_t bootstrap_count () const; @@ -92,7 +94,7 @@ class tcp_listener final size_t count_per_ip (nano::ip_address const & ip) const; size_t count_per_subnetwork (nano::ip_address const & ip) const; - size_t count_per_attempt (nano::ip_address const & ip) const; + size_t count_attempts (nano::ip_address const & ip) const; static nano::stat::dir to_stat_dir (connection_type); static std::string_view to_string (connection_type); @@ -116,7 +118,13 @@ class tcp_listener final nano::tcp_endpoint endpoint; std::future future; - std::chrono::steady_clock::time_point start{ std::chrono::steady_clock::now () }; + attempt_entry (nano::tcp_endpoint const & endpoint, std::future && future) : + endpoint{ endpoint }, future{ std::move (future) } + { + } + + asio::cancellation_signal cancellation_signal{}; + std::chrono::steady_clock::time_point const start{ std::chrono::steady_clock::now () }; nano::ip_address address () const { @@ -128,6 +136,7 @@ class tcp_listener final uint16_t const port; std::size_t const max_inbound_connections; size_t const max_connection_attempts; + size_t const max_attempts_per_ip{ 1 }; // clang-format off class tag_address {}; @@ -137,17 +146,14 @@ class tcp_listener final mi::hashed_non_unique, mi::const_mem_fun> >>; - - using ordered_attempts = boost::multi_index_container, - mi::const_mem_fun> - >>; // clang-format on - ordered_connections connections; - ordered_attempts attempts; + std::list attempts; + + // All connection attempts are serialized through this strand + asio::strand strand; + // For simplicity, acceptor is not run on the strand but instead guarded by mutex asio::ip::tcp::acceptor acceptor; asio::ip::tcp::endpoint local;