Skip to content

Commit

Permalink
USE COROUTINES
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Apr 18, 2024
1 parent ef2524b commit 828c29f
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 41 deletions.
50 changes: 50 additions & 0 deletions nano/lib/coroutines.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#pragma once

#include <nano/lib/utility.hpp>

#include <boost/asio.hpp>

namespace asio = boost::asio;

namespace nano::this_coro
{
asio::awaitable<void> sleep_for (auto duration)
{
asio::steady_timer timer{ co_await asio::this_coro::executor };
timer.expires_after (duration);
co_await timer.async_wait (asio::use_awaitable);
}
}

namespace nano
{
template <typename Executor>
class async_condition
{
public:
explicit async_condition (Executor & executor) :
executor{ executor },
timer{ executor }
{
}

void notify ()
{
debug_assert (executor.running_in_this_thread ());

timer.cancel ();
}

asio::awaitable<void> wait_for_async (auto duration)
{
debug_assert (executor.running_in_this_thread ());

timer.expires_after (duration);
co_await timer.async_wait (asio::use_awaitable);
}

private:
Executor & executor;
asio::steady_timer timer;
};
}
76 changes: 40 additions & 36 deletions nano/node/transport/tcp_listener.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <nano/lib/coroutines.hpp>
#include <nano/lib/interval.hpp>
#include <nano/node/messages.hpp>
#include <nano/node/node.hpp>
Expand All @@ -21,6 +22,7 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_
logger{ node_a.logger },
port{ port_a },
max_inbound_connections{ max_inbound_connections },
strand{ node_a.io_ctx.get_executor () },
acceptor{ node_a.io_ctx },
local{ asio::ip::tcp::endpoint{ asio::ip::address_v6::any (), port_a } }
{
Expand All @@ -32,13 +34,14 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_
nano::transport::tcp_listener::~tcp_listener ()
{
// Thread should be stopped before destruction
debug_assert (!thread.joinable ());
debug_assert (!cleanup_thread.joinable ());
debug_assert (!future.valid () || future.wait_for (0s) == std::future_status::ready);
}

void nano::transport::tcp_listener::start ()
{
debug_assert (!thread.joinable ());
debug_assert (!cleanup_thread.joinable ());
debug_assert (!future.valid ());

try
{
Expand All @@ -56,13 +59,15 @@ void nano::transport::tcp_listener::start ()
throw std::runtime_error (ex.code ().message ());
}

thread = std::thread ([this] {
nano::thread_role::set (nano::thread_role::name::tcp_listener);
future = asio::co_spawn (strand, [this] () -> asio::awaitable<void> {
try
{
logger.debug (nano::log::type::tcp_listener, "Starting acceptor thread");
run ();
logger.debug (nano::log::type::tcp_listener, "Stopped acceptor thread");
logger.debug (nano::log::type::tcp_listener, "Starting acceptor");

co_await run ();
debug_assert (strand.running_in_this_thread ());

logger.debug (nano::log::type::tcp_listener, "Stopped acceptor");
}
catch (std::exception const & ex)
{
Expand All @@ -73,8 +78,7 @@ void nano::transport::tcp_listener::start ()
{
logger.critical (nano::log::type::tcp_listener, "Unknown error");
release_assert (false); // Should be handled earlier
}
});
} }, asio::use_future);

cleanup_thread = std::thread ([this] {
nano::thread_role::set (nano::thread_role::name::tcp_listener);
Expand All @@ -87,22 +91,27 @@ void nano::transport::tcp_listener::stop ()
debug_assert (!stopped);

logger.info (nano::log::type::tcp_listener, "Stopping listening for incoming connections and closing all sockets...");
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;

// This will run synchronously
asio::post (strand, asio::use_future ([this] {
boost::system::error_code ec;
acceptor.close (ec); // Best effort to close the acceptor, ignore errors
if (ec)
{
logger.error (nano::log::type::tcp_listener, "Error while closing acceptor: {}", ec.message ());
}
}))
.wait ();

{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
condition.notify_all ();

if (thread.joinable ())
if (future.valid ())
{
thread.join ();
future.wait ();
}
if (cleanup_thread.joinable ())
{
Expand Down Expand Up @@ -157,23 +166,23 @@ void nano::transport::tcp_listener::cleanup ()
});
}

void nano::transport::tcp_listener::run ()
asio::awaitable<void> nano::transport::tcp_listener::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
debug_assert (strand.running_in_this_thread ());

while (!stopped && acceptor.is_open ())
{
lock.unlock ();

wait_available_slots ();
co_await wait_available_slots ();

if (stopped)
{
return;
co_return;
}

try
{
auto result = accept_one ();
auto socket = co_await accept_socket ();
auto result = accept_one (std::move (socket));
if (result != accept_result::accepted)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure, nano::stat::dir::in);
Expand All @@ -186,10 +195,10 @@ void nano::transport::tcp_listener::run ()
logger.log (nano::log::level::debug, nano::log::type::tcp_listener, "Error accepting incoming connection: {}", ex.what ());
}

lock.lock ();

// Sleep for a while to prevent busy loop
condition.wait_for (lock, 10ms, [this] () { return stopped.load (); });
// condition.wait_for (lock, 10ms, [this] () { return stopped.load (); });

co_await nano::this_coro::sleep_for (1s);
}
if (!stopped)
{
Expand All @@ -198,20 +207,15 @@ void nano::transport::tcp_listener::run ()
}
}

asio::ip::tcp::socket nano::transport::tcp_listener::accept_socket ()
asio::awaitable<asio::ip::tcp::socket> nano::transport::tcp_listener::accept_socket ()
{
std::future<asio::ip::tcp::socket> future;
{
nano::unique_lock<nano::mutex> lock{ mutex };
future = acceptor.async_accept (asio::use_future);
}
future.wait ();
return future.get ();
debug_assert (strand.running_in_this_thread ());

co_return co_await acceptor.async_accept (asio::use_awaitable);
}

auto nano::transport::tcp_listener::accept_one () -> accept_result
auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket) -> accept_result
{
auto raw_socket = accept_socket ();
auto const remote_endpoint = raw_socket.remote_endpoint ();
auto const local_endpoint = raw_socket.local_endpoint ();

Expand Down Expand Up @@ -255,7 +259,7 @@ auto nano::transport::tcp_listener::accept_one () -> accept_result
return accept_result::accepted;
}

void nano::transport::tcp_listener::wait_available_slots ()
asio::awaitable<void> nano::transport::tcp_listener::wait_available_slots () const
{
nano::interval log_interval;
while (connection_count () >= max_inbound_connections && !stopped)
Expand All @@ -266,7 +270,7 @@ void nano::transport::tcp_listener::wait_available_slots ()
connection_count (), max_inbound_connections);
}

std::this_thread::sleep_for (100ms);
co_await nano::this_coro::sleep_for (100ms);
}
}

Expand Down
15 changes: 10 additions & 5 deletions nano/node/transport/tcp_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

#include <nano/node/common.hpp>

#include <boost/asio.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index_container.hpp>

#include <atomic>
#include <future>
#include <string_view>
#include <thread>

Expand Down Expand Up @@ -54,10 +56,11 @@ class tcp_listener final
nano::logger & logger;

private:
void run ();
asio::awaitable<void> run ();
asio::awaitable<void> wait_available_slots () const;

void run_cleanup ();
void cleanup ();
void wait_available_slots ();

enum class accept_result
{
Expand All @@ -68,9 +71,9 @@ class tcp_listener final
excluded,
};

accept_result accept_one ();
accept_result accept_one (asio::ip::tcp::socket);
accept_result check_limits (asio::ip::address const & ip);
asio::ip::tcp::socket accept_socket ();
asio::awaitable<asio::ip::tcp::socket> accept_socket ();

size_t count_per_ip (asio::ip::address const & ip) const;
size_t count_per_subnetwork (asio::ip::address const & ip) const;
Expand Down Expand Up @@ -103,13 +106,15 @@ class tcp_listener final
// clang-format on
ordered_connections connections;

asio::strand<asio::io_context::executor_type> strand;

asio::ip::tcp::acceptor acceptor;
asio::ip::tcp::endpoint local;

std::atomic<bool> stopped;
nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread thread;
std::future<void> future;
std::thread cleanup_thread;
};
}

0 comments on commit 828c29f

Please sign in to comment.