Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Apr 20, 2024
1 parent debda86 commit 7fee045
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 38 deletions.
20 changes: 16 additions & 4 deletions nano/core_test/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ TEST (socket, max_connections)
std::mutex server_sockets_mutex;

// start a server socket that allows max 2 live connections
nano::transport::tcp_listener listener{ server_port, *node, 2 };
nano::transport::tcp_config tcp_config;
tcp_config.max_inbound_connections = 2;

nano::transport::tcp_listener listener{ server_port, tcp_config, *node };
listener.connection_accepted.add ([&] (auto const & socket, auto const & server) {
std::lock_guard guard{ server_sockets_mutex };
server_sockets.push_back (socket);
Expand Down Expand Up @@ -144,7 +147,10 @@ TEST (socket, max_connections_per_ip)
// successful incoming connections are stored in server_sockets to keep them alive (server side)
std::vector<std::shared_ptr<nano::transport::socket>> server_sockets;

nano::transport::tcp_listener listener{ server_port, *node, max_global_connections };
nano::transport::tcp_config tcp_config;
tcp_config.max_inbound_connections = max_global_connections;

nano::transport::tcp_listener listener{ server_port, tcp_config, *node };
listener.connection_accepted.add ([&server_sockets] (auto const & socket, auto const & server) {
server_sockets.push_back (socket);
});
Expand Down Expand Up @@ -267,7 +273,10 @@ TEST (socket, max_connections_per_subnetwork)
// successful incoming connections are stored in server_sockets to keep them alive (server side)
std::vector<std::shared_ptr<nano::transport::socket>> server_sockets;

nano::transport::tcp_listener listener{ server_port, *node, max_global_connections };
nano::transport::tcp_config tcp_config;
tcp_config.max_inbound_connections = max_global_connections;

nano::transport::tcp_listener listener{ server_port, tcp_config, *node };
listener.connection_accepted.add ([&server_sockets] (auto const & socket, auto const & server) {
server_sockets.push_back (socket);
});
Expand Down Expand Up @@ -330,7 +339,10 @@ TEST (socket, disabled_max_peers_per_ip)
// successful incoming connections are stored in server_sockets to keep them alive (server side)
std::vector<std::shared_ptr<nano::transport::socket>> server_sockets;

nano::transport::tcp_listener listener = { server_port, *node, max_global_connections };
nano::transport::tcp_config tcp_config;
tcp_config.max_inbound_connections = max_global_connections;

nano::transport::tcp_listener listener = { server_port, tcp_config, *node };
listener.connection_accepted.add ([&server_sockets] (auto const & socket, auto const & server) {
server_sockets.push_back (socket);
});
Expand Down
16 changes: 12 additions & 4 deletions nano/lib/async.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ class cancellation
{
public:
explicit cancellation (nano::async::strand & strand) :
strand{ strand }
strand{ strand },
signal{ std::make_unique<asio::cancellation_signal> () }
{
}

void emit (asio::cancellation_type type = asio::cancellation_type::all)
{
asio::dispatch (strand, asio::use_future ([this, type] () {
signal.emit (type);
signal->emit (type);
}))
.wait ();
}
Expand All @@ -48,13 +49,20 @@ class cancellation
{
// Ensure that the slot is only connected once
debug_assert (std::exchange (slotted, true) == false);
return signal.slot ();
return signal->slot ();
}

private:
nano::async::strand & strand;
asio::cancellation_signal signal;
std::unique_ptr<asio::cancellation_signal> signal; // Wrap the signal in a unique_ptr to enable moving

bool slotted{ false };
};

auto spawn (nano::async::strand & strand, auto && func)
{
nano::async::cancellation cancellation{ strand };
auto fut = asio::co_spawn (strand, std::forward<decltype (func)> (func), asio::bind_cancellation_slot (cancellation.slot (), asio::use_future));
return std::make_pair (std::move (fut), std::move (cancellation));
}
}
6 changes: 3 additions & 3 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,18 +241,18 @@ enum class detail : uint8_t
accept_success,
accept_error,
accept_failure,
accept_limits_exceeded,
attempts_limits_exceeded,
attempt_in_progress,
accept_rejected,
close_error,
max_per_ip,
max_per_subnetwork,
max_attempts,
max_attempts_per_ip,
excluded,
erase_dead,
connect_initiate,
connect_failure,
connect_error,
connect_rejected,
attempt_timeout,

// tcp_server
Expand Down
2 changes: 1 addition & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
// Thus, be very careful if you change the order: if `bootstrap` gets constructed before `network`,
// the latter would inherit the port from the former (if TCP is active, otherwise `network` picks first)
//
tcp_listener_impl{ std::make_unique<nano::transport::tcp_listener> (network.port, {}, *this) },
tcp_listener_impl{ std::make_unique<nano::transport::tcp_listener> (network.port, nano::transport::tcp_config{}, *this) },
tcp_listener{ *tcp_listener_impl },
application_path (application_path_a),
port_mapping (*this),
Expand Down
150 changes: 128 additions & 22 deletions nano/node/transport/tcp_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,65 @@ void nano::transport::tcp_listener::timeout ()
}
}

bool nano::transport::tcp_listener::connect (asio::ip::address ip, uint16_t port)
{
nano::unique_lock<nano::mutex> lock{ mutex };

if (port == 0)
{
port = node.network_params.network.default_node_port;
}

if (auto result = check_limits (ip, connection_type::outbound); result != accept_result::accepted)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_rejected, nano::stat::dir::out);
// Refusal reason should be logged earlier

return false; // Rejected
}

nano::tcp_endpoint endpoint{ ip, port };

stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_initiate, nano::stat::dir::out);
logger.debug (nano::log::type::tcp_listener, "Initiating outgoing connection to: {}", fmt::streamed (endpoint));

// auto future = asio::co_spawn (node.io_ctx, connect_impl (endpoint), asio::use_future);
auto [future, cancellation] = nano::async::spawn (strand, connect_impl (endpoint));

attempt att{ endpoint, std::move (future), std::move (cancellation) };

// attempts.emplace ();

return true; // Attempt started
}

auto nano::transport::tcp_listener::connect_impl (asio::ip::tcp::endpoint endpoint) -> asio::awaitable<accept_result>
{
try
{
auto raw_socket = co_await connect_socket (endpoint);
debug_assert (strand.running_in_this_thread ());

auto result = accept_one (std::move (raw_socket), connection_type::outbound);
if (result != accept_result::accepted)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_failure, nano::stat::dir::out);
// Refusal reason should be logged earlier

co_return result;
}
}
catch (boost::system::system_error const & ex)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_error, nano::stat::dir::out);
logger.log (nano::log::level::debug, nano::log::type::tcp_listener, "Error connecting to: {} ({})", fmt::streamed (endpoint), ex.what ());

co_return accept_result::error;
}

co_return accept_result::accepted;
}

asio::awaitable<void> nano::transport::tcp_listener::run ()
{
debug_assert (strand.running_in_this_thread ());
Expand All @@ -223,6 +282,8 @@ asio::awaitable<void> nano::transport::tcp_listener::run ()
try
{
auto socket = co_await accept_socket ();
debug_assert (strand.running_in_this_thread ());

auto result = accept_one (std::move (socket), connection_type::inbound);
if (result != accept_result::accepted)
{
Expand Down Expand Up @@ -253,6 +314,31 @@ asio::awaitable<asio::ip::tcp::socket> nano::transport::tcp_listener::accept_soc
co_return co_await acceptor.async_accept (asio::use_awaitable);
}

asio::awaitable<asio::ip::tcp::socket> nano::transport::tcp_listener::connect_socket (asio::ip::tcp::endpoint endpoint)
{
debug_assert (strand.running_in_this_thread ());

asio::ip::tcp::socket raw_socket{ strand };
co_await raw_socket.async_connect (endpoint, asio::use_awaitable);

co_return raw_socket;
}

asio::awaitable<void> nano::transport::tcp_listener::wait_available_slots () const
{
nano::interval log_interval;
while (connection_count () >= config.max_inbound_connections && !stopped)
{
if (log_interval.elapsed (node.network_params.network.is_dev_network () ? 1s : 15s))
{
logger.warn (nano::log::type::tcp_listener, "Waiting for available slots to accept new connections (current: {} / max: {})",
connection_count (), config.max_inbound_connections);
}

co_await nano::async::sleep_for (100ms);
}
}

auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket, connection_type type) -> accept_result
{
auto const remote_endpoint = raw_socket.remote_endpoint ();
Expand All @@ -262,7 +348,7 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket

if (auto result = check_limits (remote_endpoint.address (), type); result != accept_result::accepted)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_limits_exceeded, to_stat_dir (type));
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_rejected, to_stat_dir (type));
// Refusal reason should be logged earlier

try
Expand Down Expand Up @@ -299,46 +385,36 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket
return accept_result::accepted;
}

asio::awaitable<void> nano::transport::tcp_listener::wait_available_slots () const
{
nano::interval log_interval;
while (connection_count () >= config.max_inbound_connections && !stopped)
{
if (log_interval.elapsed (node.network_params.network.is_dev_network () ? 1s : 15s))
{
logger.warn (nano::log::type::tcp_listener, "Waiting for available slots to accept new connections (current: {} / max: {})",
connection_count (), config.max_inbound_connections);
}

co_await nano::async::sleep_for (100ms);
}
}

auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, connection_type type) -> accept_result
{
debug_assert (!mutex.try_lock ());

if (stopped)
{
return accept_result::rejected;
}

cleanup ();

debug_assert (connections.size () <= config.max_inbound_connections); // Should be checked earlier (wait_available_slots)

if (node.network.excluded_peers.check (ip)) // true => error
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::excluded, nano::stat::dir::in);
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::excluded, to_stat_dir (type));
logger.debug (nano::log::type::tcp_listener, "Rejected connection from excluded peer: {}", ip.to_string ());

return accept_result::excluded;
return accept_result::rejected;
}

if (!node.flags.disable_max_peers_per_ip)
{
if (auto count = count_per_ip (ip); count >= node.network_params.network.max_peers_per_ip)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_ip, nano::stat::dir::in);
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_ip, to_stat_dir (type));
logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached (ip: {}, count: {}), unable to open new connection",
ip.to_string (), count);

return accept_result::too_many_per_ip;
return accept_result::rejected;
}
}

Expand All @@ -347,11 +423,32 @@ auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip,
{
if (auto count = count_per_subnetwork (ip); count >= node.network_params.network.max_peers_per_subnetwork)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_subnetwork, nano::stat::dir::in);
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_subnetwork, to_stat_dir (type));
logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (ip: {}, count: {}), unable to open new connection",
ip.to_string (), count);

return accept_result::too_many_per_subnetwork;
return accept_result::rejected;
}
}

if (type == connection_type::outbound)
{
if (attempts.size () > config.max_attempts)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_attempts, to_stat_dir (type));
logger.debug (nano::log::type::tcp_listener, "Max total connection attempts reached (ip: {}, count: {}), unable to initiate new connection ({})",
ip.to_string (), attempts.size (), to_string (type));

return accept_result::rejected;
}

if (auto count = count_attempts (ip); count >= config.max_attempts_per_ip)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_attempts_per_ip, to_stat_dir (type));
logger.debug (nano::log::type::tcp_listener, "Connection attempt already in progress (ip: {}), unable to initiate new connection ({})",
ip.to_string (), to_string (type));

return accept_result::rejected;
}
}

Expand Down Expand Up @@ -414,6 +511,15 @@ size_t nano::transport::tcp_listener::count_per_subnetwork (asio::ip::address co
});
}

size_t nano::transport::tcp_listener::count_attempts (asio::ip::address const & ip) const
{
debug_assert (!mutex.try_lock ());

return std::count_if (attempts.begin (), attempts.end (), [&ip] (auto const & attempt) {
return nano::transport::is_same_ip (attempt.address (), ip);
});
}

asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
Expand Down
11 changes: 7 additions & 4 deletions nano/node/transport/tcp_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <nano/lib/async.hpp>
#include <nano/node/common.hpp>
#include <nano/node/transport/common.hpp>

#include <boost/asio.hpp>
#include <boost/multi_index/hashed_index.hpp>
Expand Down Expand Up @@ -89,9 +90,8 @@ class tcp_listener final
{
invalid,
accepted,
too_many_per_ip,
too_many_per_subnetwork,
excluded,
rejected,
error,
};

enum class connection_type
Expand All @@ -100,6 +100,9 @@ class tcp_listener final
outbound,
};

asio::awaitable<accept_result> connect_impl (asio::ip::tcp::endpoint);
asio::awaitable<asio::ip::tcp::socket> connect_socket (asio::ip::tcp::endpoint);

accept_result accept_one (asio::ip::tcp::socket, connection_type);
accept_result check_limits (asio::ip::address const & ip, connection_type);
asio::awaitable<asio::ip::tcp::socket> accept_socket ();
Expand All @@ -124,7 +127,7 @@ class tcp_listener final
struct attempt
{
asio::ip::tcp::endpoint endpoint;
std::future<void> future;
std::future<accept_result> future;
nano::async::cancellation cancellation;

std::chrono::steady_clock::time_point const start{ std::chrono::steady_clock::now () };
Expand Down

0 comments on commit 7fee045

Please sign in to comment.