Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Apr 20, 2024
1 parent 719588e commit 93c6337
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 26 deletions.
7 changes: 7 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,18 @@ enum class detail : uint8_t
accept_error,
accept_failure,
accept_limits_exceeded,
attempts_limits_exceeded,
attempt_in_progress,
close_error,
max_per_ip,
max_per_subnetwork,
max_attempts,
excluded,
erase_dead,
connect_initiate,
connect_failure,
connect_error,
attempt_timeout,

// tcp_server
handshake,
Expand Down
2 changes: 1 addition & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
// Thus, be very careful if you change the order: if `bootstrap` gets constructed before `network`,
// the latter would inherit the port from the former (if TCP is active, otherwise `network` picks first)
//
tcp_listener_impl{ std::make_unique<nano::transport::tcp_listener> (network.port, *this, config.tcp_incoming_connections_max) },
tcp_listener_impl{ std::make_unique<nano::transport::tcp_listener> (network.port, {}, *this) },
tcp_listener{ *tcp_listener_impl },
application_path (application_path_a),
port_mapping (*this),
Expand Down
121 changes: 100 additions & 21 deletions nano/node/transport/tcp_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@

#include <memory>

#include <magic_enum.hpp>

using namespace std::chrono_literals;

/*
* tcp_listener
*/

nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a, std::size_t max_inbound_connections) :
nano::transport::tcp_listener::tcp_listener (uint16_t port_a, tcp_config const & config_a, nano::node & node_a) :
config{ config_a },
node{ node_a },
stats{ node_a.stats },
logger{ node_a.logger },
port{ port_a },
max_inbound_connections{ max_inbound_connections },
strand{ node_a.io_ctx.get_executor () },
cancellation{ strand },
acceptor{ strand }
Expand All @@ -32,9 +34,10 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_

nano::transport::tcp_listener::~tcp_listener ()
{
// Thread should be stopped before destruction
debug_assert (!cleanup_thread.joinable ());
debug_assert (!future.valid () || future.wait_for (0s) == std::future_status::ready);
debug_assert (connection_count () == 0);
debug_assert (attempt_count () == 0);
}

void nano::transport::tcp_listener::start ()
Expand Down Expand Up @@ -124,9 +127,17 @@ void nano::transport::tcp_listener::stop ()
}

decltype (connections) connections_l;
decltype (attempts) attempts_l;
{
nano::lock_guard<nano::mutex> lock{ mutex };
connections_l.swap (connections);
attempts_l.swap (attempts);
}

for (auto & attempt : attempts_l)
{
attempt.cancellation.emit ();
attempt.future.wait ();
}

for (auto & connection : connections_l)
Expand All @@ -149,6 +160,7 @@ void nano::transport::tcp_listener::run_cleanup ()
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::cleanup);
cleanup ();
timeout ();
condition.wait_for (lock, 1s, [this] () { return stopped.load (); });
}
}
Expand All @@ -157,6 +169,7 @@ void nano::transport::tcp_listener::cleanup ()
{
debug_assert (!mutex.try_lock ());

// Erase dead connections
erase_if (connections, [this] (auto const & connection) {
if (connection.socket.expired () && connection.server.expired ())
{
Expand All @@ -169,6 +182,34 @@ void nano::transport::tcp_listener::cleanup ()
return false;
}
});

// Erase completed attempts
erase_if (attempts, [this] (auto const & attempt) {
return attempt.future.wait_for (0s) == std::future_status::ready;
});
}

void nano::transport::tcp_listener::timeout ()
{
debug_assert (!mutex.try_lock ());

auto const cutoff = std::chrono::steady_clock::now () - config.connect_timeout;

// Cancel timed out attempts
for (auto & attempt : attempts)
{
if (auto status = attempt.future.wait_for (0s); status != std::future_status::ready)
{
if (attempt.start < cutoff)
{
attempt.cancellation.emit ();

stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_timeout);
logger.debug (nano::log::type::tcp_listener, "Connection attempt timed out: {} (started {}s ago)",
fmt::streamed (attempt.endpoint), nano::log::seconds_delta (attempt.start));
}
}
}
}

asio::awaitable<void> nano::transport::tcp_listener::run ()
Expand All @@ -182,7 +223,7 @@ asio::awaitable<void> nano::transport::tcp_listener::run ()
try
{
auto socket = co_await accept_socket ();
auto result = accept_one (std::move (socket));
auto result = accept_one (std::move (socket), connection_type::inbound);
if (result != accept_result::accepted)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure, nano::stat::dir::in);
Expand Down Expand Up @@ -212,14 +253,16 @@ asio::awaitable<asio::ip::tcp::socket> nano::transport::tcp_listener::accept_soc
co_return co_await acceptor.async_accept (asio::use_awaitable);
}

auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket) -> accept_result
auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket, connection_type type) -> accept_result
{
auto const remote_endpoint = raw_socket.remote_endpoint ();
auto const local_endpoint = raw_socket.local_endpoint ();

if (auto result = check_limits (remote_endpoint.address ()); result != accept_result::accepted)
nano::unique_lock<nano::mutex> lock{ mutex };

if (auto result = check_limits (remote_endpoint.address (), type); result != accept_result::accepted)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_limits_exceeded, nano::stat::dir::in);
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_limits_exceeded, to_stat_dir (type));
// Refusal reason should be logged earlier

try
Expand All @@ -230,23 +273,22 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket
}
catch (boost::system::system_error const & ex)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::close_error, nano::stat::dir::in);
logger.debug (nano::log::type::tcp_listener, "Error while closing socket after refusing connection: {}", ex.what ());
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::close_error, to_stat_dir (type));
logger.debug (nano::log::type::tcp_listener, "Error while closing socket after refusing connection: {} ({})", ex.what (), to_string (type));
}

return result;
}

stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in);
logger.debug (nano::log::type::tcp_listener, "Accepted incoming connection from: {}", fmt::streamed (remote_endpoint));
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, to_stat_dir (type));
logger.debug (nano::log::type::tcp_listener, "Accepted connection to: {} ({})", fmt::streamed (remote_endpoint), to_string (type));

auto socket = std::make_shared<nano::transport::socket> (node, std::move (raw_socket), remote_endpoint, local_endpoint, socket_endpoint::server);
auto socket = std::make_shared<nano::transport::socket> (node, std::move (raw_socket), remote_endpoint, local_endpoint, to_socket_type (type));
auto server = std::make_shared<nano::transport::tcp_server> (socket, node.shared (), true);

{
nano::lock_guard<nano::mutex> lock{ mutex };
connections.emplace (entry{ remote_endpoint, socket, server });
}
connections.emplace (entry{ remote_endpoint, socket, server });

lock.unlock ();

socket->set_timeout (node.network_params.network.idle_timeout);
socket->start ();
Expand All @@ -260,25 +302,25 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket
asio::awaitable<void> nano::transport::tcp_listener::wait_available_slots () const
{
nano::interval log_interval;
while (connection_count () >= max_inbound_connections && !stopped)
while (connection_count () >= config.max_inbound_connections && !stopped)
{
if (log_interval.elapsed (node.network_params.network.is_dev_network () ? 1s : 15s))
{
logger.warn (nano::log::type::tcp_listener, "Waiting for available slots to accept new connections (current: {} / max: {})",
connection_count (), max_inbound_connections);
connection_count (), config.max_inbound_connections);
}

co_await nano::async::sleep_for (100ms);
}
}

auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip) -> accept_result
auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, connection_type type) -> accept_result
{
nano::lock_guard<nano::mutex> lock{ mutex };
debug_assert (!mutex.try_lock ());

cleanup ();

debug_assert (connections.size () <= max_inbound_connections); // Should be checked earlier (wait_available_slots)
debug_assert (connections.size () <= config.max_inbound_connections); // Should be checked earlier (wait_available_slots)

if (node.network.excluded_peers.check (ip)) // true => error
{
Expand Down Expand Up @@ -322,6 +364,12 @@ size_t nano::transport::tcp_listener::connection_count () const
return connections.size ();
}

size_t nano::transport::tcp_listener::attempt_count () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return attempts.size ();
}

size_t nano::transport::tcp_listener::realtime_count () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
Expand Down Expand Up @@ -377,4 +425,35 @@ std::unique_ptr<nano::container_info_component> nano::transport::tcp_listener::c
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) }));
return composite;
}

nano::stat::dir nano::transport::tcp_listener::to_stat_dir (connection_type type)
{
switch (type)
{
case connection_type::inbound:
return nano::stat::dir::in;
case connection_type::outbound:
return nano::stat::dir::out;
}
debug_assert (false);
return {};
}

std::string_view nano::transport::tcp_listener::to_string (connection_type type)
{
return magic_enum::enum_name (type);
}

nano::transport::socket_endpoint nano::transport::tcp_listener::to_socket_type (connection_type type)
{
switch (type)
{
case connection_type::inbound:
return socket_endpoint::server;
case connection_type::outbound:
return socket_endpoint::client;
}
debug_assert (false);
return {};
}
Loading

0 comments on commit 93c6337

Please sign in to comment.