diff --git a/nano/lib/coroutines.hpp b/nano/lib/coroutines.hpp new file mode 100644 index 0000000000..6f4ab23261 --- /dev/null +++ b/nano/lib/coroutines.hpp @@ -0,0 +1,50 @@ +#pragma once + +#include + +#include + +namespace asio = boost::asio; + +namespace nano::this_coro +{ +asio::awaitable sleep_for (auto duration) +{ + asio::steady_timer timer{ co_await asio::this_coro::executor }; + timer.expires_after (duration); + co_await timer.async_wait (asio::use_awaitable); +} +} + +namespace nano +{ +template +class async_condition +{ +public: + explicit async_condition (Executor & executor) : + executor{ executor }, + timer{ executor } + { + } + + void notify () + { + debug_assert (executor.running_in_this_thread ()); + + timer.cancel (); + } + + asio::awaitable wait_for_async (auto duration) + { + debug_assert (executor.running_in_this_thread ()); + + timer.expires_after (duration); + co_await timer.async_wait (asio::use_awaitable); + } + +private: + Executor & executor; + asio::steady_timer timer; +}; +} \ No newline at end of file diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp index 7cb676fb73..aff9413a93 100644 --- a/nano/node/transport/tcp_listener.cpp +++ b/nano/node/transport/tcp_listener.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -21,6 +22,7 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_ logger{ node_a.logger }, port{ port_a }, max_inbound_connections{ max_inbound_connections }, + strand{ node_a.io_ctx.get_executor () }, acceptor{ node_a.io_ctx }, local{ asio::ip::tcp::endpoint{ asio::ip::address_v6::any (), port_a } } { @@ -32,13 +34,14 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_ nano::transport::tcp_listener::~tcp_listener () { // Thread should be stopped before destruction - debug_assert (!thread.joinable ()); + debug_assert (!cleanup_thread.joinable ()); + debug_assert (!future.valid () || future.wait_for (0s) == std::future_status::ready); } void nano::transport::tcp_listener::start () { - debug_assert (!thread.joinable ()); debug_assert (!cleanup_thread.joinable ()); + debug_assert (!future.valid ()); try { @@ -56,13 +59,15 @@ void nano::transport::tcp_listener::start () throw std::runtime_error (ex.code ().message ()); } - thread = std::thread ([this] { - nano::thread_role::set (nano::thread_role::name::tcp_listener); + future = asio::co_spawn (strand, [this] () -> asio::awaitable { try { - logger.debug (nano::log::type::tcp_listener, "Starting acceptor thread"); - run (); - logger.debug (nano::log::type::tcp_listener, "Stopped acceptor thread"); + logger.debug (nano::log::type::tcp_listener, "Starting acceptor"); + + co_await run (); + debug_assert (strand.running_in_this_thread ()); + + logger.debug (nano::log::type::tcp_listener, "Stopped acceptor"); } catch (std::exception const & ex) { @@ -73,8 +78,7 @@ void nano::transport::tcp_listener::start () { logger.critical (nano::log::type::tcp_listener, "Unknown error"); release_assert (false); // Should be handled earlier - } - }); + } }, asio::use_future); cleanup_thread = std::thread ([this] { nano::thread_role::set (nano::thread_role::name::tcp_listener); @@ -87,22 +91,27 @@ void nano::transport::tcp_listener::stop () debug_assert (!stopped); logger.info (nano::log::type::tcp_listener, "Stopping listening for incoming connections and closing all sockets..."); - { - nano::lock_guard lock{ mutex }; - stopped = true; + // This will run synchronously + asio::post (strand, asio::use_future ([this] { boost::system::error_code ec; acceptor.close (ec); // Best effort to close the acceptor, ignore errors if (ec) { logger.error (nano::log::type::tcp_listener, "Error while closing acceptor: {}", ec.message ()); } + })) + .wait (); + + { + nano::lock_guard lock{ mutex }; + stopped = true; } condition.notify_all (); - if (thread.joinable ()) + if (future.valid ()) { - thread.join (); + future.wait (); } if (cleanup_thread.joinable ()) { @@ -157,23 +166,23 @@ void nano::transport::tcp_listener::cleanup () }); } -void nano::transport::tcp_listener::run () +asio::awaitable nano::transport::tcp_listener::run () { - nano::unique_lock lock{ mutex }; + debug_assert (strand.running_in_this_thread ()); + while (!stopped && acceptor.is_open ()) { - lock.unlock (); - - wait_available_slots (); + co_await wait_available_slots (); if (stopped) { - return; + co_return; } try { - auto result = accept_one (); + auto socket = co_await accept_socket (); + auto result = accept_one (std::move (socket)); if (result != accept_result::accepted) { stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure, nano::stat::dir::in); @@ -186,10 +195,10 @@ void nano::transport::tcp_listener::run () logger.log (nano::log::level::debug, nano::log::type::tcp_listener, "Error accepting incoming connection: {}", ex.what ()); } - lock.lock (); - // Sleep for a while to prevent busy loop - condition.wait_for (lock, 10ms, [this] () { return stopped.load (); }); + // condition.wait_for (lock, 10ms, [this] () { return stopped.load (); }); + + co_await nano::this_coro::sleep_for (1s); } if (!stopped) { @@ -198,20 +207,15 @@ void nano::transport::tcp_listener::run () } } -asio::ip::tcp::socket nano::transport::tcp_listener::accept_socket () +asio::awaitable nano::transport::tcp_listener::accept_socket () { - std::future future; - { - nano::unique_lock lock{ mutex }; - future = acceptor.async_accept (asio::use_future); - } - future.wait (); - return future.get (); + debug_assert (strand.running_in_this_thread ()); + + co_return co_await acceptor.async_accept (asio::use_awaitable); } -auto nano::transport::tcp_listener::accept_one () -> accept_result +auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket) -> accept_result { - auto raw_socket = accept_socket (); auto const remote_endpoint = raw_socket.remote_endpoint (); auto const local_endpoint = raw_socket.local_endpoint (); @@ -255,7 +259,7 @@ auto nano::transport::tcp_listener::accept_one () -> accept_result return accept_result::accepted; } -void nano::transport::tcp_listener::wait_available_slots () +asio::awaitable nano::transport::tcp_listener::wait_available_slots () const { nano::interval log_interval; while (connection_count () >= max_inbound_connections && !stopped) @@ -266,7 +270,7 @@ void nano::transport::tcp_listener::wait_available_slots () connection_count (), max_inbound_connections); } - std::this_thread::sleep_for (100ms); + co_await nano::this_coro::sleep_for (100ms); } } diff --git a/nano/node/transport/tcp_listener.hpp b/nano/node/transport/tcp_listener.hpp index 5bad7a7dd4..b6fb33ee21 100644 --- a/nano/node/transport/tcp_listener.hpp +++ b/nano/node/transport/tcp_listener.hpp @@ -2,11 +2,13 @@ #include +#include #include #include #include #include +#include #include #include @@ -54,10 +56,11 @@ class tcp_listener final nano::logger & logger; private: - void run (); + asio::awaitable run (); + asio::awaitable wait_available_slots () const; + void run_cleanup (); void cleanup (); - void wait_available_slots (); enum class accept_result { @@ -68,9 +71,9 @@ class tcp_listener final excluded, }; - accept_result accept_one (); + accept_result accept_one (asio::ip::tcp::socket); accept_result check_limits (asio::ip::address const & ip); - asio::ip::tcp::socket accept_socket (); + asio::awaitable accept_socket (); size_t count_per_ip (asio::ip::address const & ip) const; size_t count_per_subnetwork (asio::ip::address const & ip) const; @@ -103,13 +106,15 @@ class tcp_listener final // clang-format on ordered_connections connections; + asio::strand strand; + asio::ip::tcp::acceptor acceptor; asio::ip::tcp::endpoint local; std::atomic stopped; nano::condition_variable condition; mutable nano::mutex mutex; - std::thread thread; + std::future future; std::thread cleanup_thread; }; } \ No newline at end of file