Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Apr 2, 2024
1 parent 30e430d commit 9883515
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 18 deletions.
49 changes: 41 additions & 8 deletions nano/node/transport/tcp_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_
node{ node_a },
stats{ node_a.stats },
logger{ node_a.logger },
strand{ node.io_ctx.get_executor () },
port{ port_a },
max_inbound_connections{ max_inbound_connections },
max_connection_attempts{ max_inbound_connections / 2 },
Expand Down Expand Up @@ -121,18 +122,26 @@ void nano::transport::tcp_listener::stop ()
decltype (attempts) attempts_l;
{
nano::lock_guard<nano::mutex> lock{ mutex };

connections_l.swap (connections);
attempts_l.swap (attempts);
}

// Cancel and await in-flight attempts
// Cancel all in-flight attempts
asio::post (strand, asio::use_future ([&attempts_l] () {
for (auto & attempt : attempts_l)
{
attempt.cancellation_signal.emit (asio::cancellation_type::terminal);
}
}))
.wait ();

// Wait for all attempts to complete
for (auto & attempt : attempts_l)
{
// TODO: Cancel attempt
attempt.future.wait ();
}

// Gracefully close all connections
for (auto & connection : connections_l)
{
if (auto socket = connection.socket.lock ())
Expand All @@ -144,6 +153,10 @@ void nano::transport::tcp_listener::stop ()
server->stop ();
}
}

// No new connections should be accepted or initiated in the meantime
debug_assert (connection_count () == 0);
debug_assert (attempt_count () == 0);
}

void nano::transport::tcp_listener::run_cleanup ()
Expand Down Expand Up @@ -195,11 +208,14 @@ void nano::transport::tcp_listener::timeout ()
{
if (attempt.start < cutoff)
{
// TODO: Cancel attempt

stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_timeout);
logger.debug (nano::log::type::tcp_listener, "Connection attempt timed out: {} (started {}s ago)",
fmt::streamed (attempt.endpoint), nano::log::seconds_delta (attempt.start));

asio::post (strand, asio::use_future ([&attempt] () {
attempt.cancellation_signal.emit (asio::cancellation_type::terminal);
}))
.wait ();
}
}
}
Expand Down Expand Up @@ -255,6 +271,11 @@ bool nano::transport::tcp_listener::connect (nano::ip_address ip, nano::ip_port
{
nano::unique_lock<nano::mutex> lock{ mutex };

if (stopped)
{
return false; // Rejected
}

if (port == 0)
{
port = node.network_params.network.default_node_port;
Expand All @@ -274,12 +295,13 @@ bool nano::transport::tcp_listener::connect (nano::ip_address ip, nano::ip_port
logger.debug (nano::log::type::tcp_listener, "Initiating outgoing connection to: {}", fmt::streamed (endpoint));

auto future = asio::co_spawn (node.io_ctx, connect_impl (endpoint), asio::use_future);
attempts.emplace (attempt_entry{ endpoint, std::move (future) });
attempts.emplace_back (endpoint, std::move (future));
return true; // Attempt started
}

auto nano::transport::tcp_listener::connect_impl (nano::tcp_endpoint endpoint) -> asio::awaitable<void>
{
debug_assert (strand.running_in_this_thread ());
try
{
asio::ip::tcp::socket raw_socket{ node.io_ctx };
Expand Down Expand Up @@ -317,6 +339,11 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket

nano::unique_lock<nano::mutex> lock{ mutex };

if (stopped)
{
return accept_result::invalid;
}

if (auto result = check_limits (remote_endpoint.address (), connection_type::inbound); result != accept_result::accepted)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_limits_exceeded, to_stat_dir (type));
Expand Down Expand Up @@ -425,7 +452,7 @@ auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip,
return accept_result::too_many_attempts;
}

if (auto count = count_per_attempt (ip); count >= 1)
if (auto count = count_attempts (ip); count >= max_attempts_per_ip)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_in_progress, to_stat_dir (type));
logger.debug (nano::log::type::tcp_listener, "Connection attempt already in progress (ip: {}), unable to initiate new connection ({})",
Expand All @@ -444,6 +471,12 @@ size_t nano::transport::tcp_listener::connection_count () const
return connections.size ();
}

size_t nano::transport::tcp_listener::attempt_count () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return attempts.size ();
}

size_t nano::transport::tcp_listener::realtime_count () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
Expand Down Expand Up @@ -488,7 +521,7 @@ size_t nano::transport::tcp_listener::count_per_subnetwork (nano::ip_address con
});
}

size_t nano::transport::tcp_listener::count_per_attempt (nano::ip_address const & ip) const
size_t nano::transport::tcp_listener::count_attempts (nano::ip_address const & ip) const
{
debug_assert (!mutex.try_lock ());

Expand Down
26 changes: 16 additions & 10 deletions nano/node/transport/tcp_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <atomic>
#include <chrono>
#include <list>
#include <string_view>
#include <thread>

Expand Down Expand Up @@ -48,6 +49,7 @@ class tcp_listener final

nano::tcp_endpoint endpoint () const;
size_t connection_count () const;
size_t attempt_count () const;
size_t realtime_count () const;
size_t bootstrap_count () const;

Expand Down Expand Up @@ -92,7 +94,7 @@ class tcp_listener final

size_t count_per_ip (nano::ip_address const & ip) const;
size_t count_per_subnetwork (nano::ip_address const & ip) const;
size_t count_per_attempt (nano::ip_address const & ip) const;
size_t count_attempts (nano::ip_address const & ip) const;

static nano::stat::dir to_stat_dir (connection_type);
static std::string_view to_string (connection_type);
Expand All @@ -116,7 +118,13 @@ class tcp_listener final
nano::tcp_endpoint endpoint;
std::future<void> future;

std::chrono::steady_clock::time_point start{ std::chrono::steady_clock::now () };
attempt_entry (nano::tcp_endpoint const & endpoint, std::future<void> && future) :
endpoint{ endpoint }, future{ std::move (future) }
{
}

asio::cancellation_signal cancellation_signal{};
std::chrono::steady_clock::time_point const start{ std::chrono::steady_clock::now () };

nano::ip_address address () const
{
Expand All @@ -128,6 +136,7 @@ class tcp_listener final
uint16_t const port;
std::size_t const max_inbound_connections;
size_t const max_connection_attempts;
size_t const max_attempts_per_ip{ 1 };

// clang-format off
class tag_address {};
Expand All @@ -137,17 +146,14 @@ class tcp_listener final
mi::hashed_non_unique<mi::tag<tag_address>,
mi::const_mem_fun<entry, nano::ip_address, &entry::address>>
>>;

using ordered_attempts = boost::multi_index_container<attempt_entry,
mi::indexed_by<
mi::hashed_non_unique<mi::tag<tag_address>,
mi::const_mem_fun<attempt_entry, nano::ip_address, &attempt_entry::address>>
>>;
// clang-format on

ordered_connections connections;
ordered_attempts attempts;

std::list<attempt_entry> attempts;

// All connection attempts are serialized through this strand
asio::strand<asio::io_context::executor_type> strand;
// For simplicity, acceptor is not run on the strand but instead guarded by mutex
asio::ip::tcp::acceptor acceptor;
asio::ip::tcp::endpoint local;

Expand Down

0 comments on commit 9883515

Please sign in to comment.