Skip to content

Commit

Permalink
Reorganize tcp_listener
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 19, 2024
1 parent 5e91706 commit 3dc352e
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 44 deletions.
15 changes: 3 additions & 12 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -698,10 +698,7 @@ TEST (tcp_listener, DISABLED_tcp_listener_timeout_empty)
system.deadline_set (std::chrono::seconds (6));
while (!disconnected)
{
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
disconnected = node0->tcp_listener->connections.empty ();
}
disconnected = node0->tcp_listener->connection_count () == 0;
ASSERT_NO_ERROR (system.poll ());
}
}
Expand All @@ -723,18 +720,12 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake)
});
});
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake) != 0);
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
ASSERT_EQ (node0->tcp_listener->connections.size (), 1);
}
ASSERT_EQ (node0->tcp_listener->connection_count (), 1);
bool disconnected (false);
system.deadline_set (std::chrono::seconds (20));
while (!disconnected)
{
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
disconnected = node0->tcp_listener->connections.empty ();
}
disconnected = node0->tcp_listener->connection_count () == 0;
ASSERT_NO_ERROR (system.poll ());
}
}
Expand Down
2 changes: 1 addition & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.ledger, "ledger"));
composite->add_component (collect_container_info (node.active, "active"));
composite->add_component (collect_container_info (node.bootstrap_initiator, "bootstrap_initiator"));
composite->add_component (collect_container_info (*node.tcp_listener, "tcp_listener"));
composite->add_component (node.tcp_listener->collect_container_info ("tcp_listener"));
composite->add_component (collect_container_info (node.network, "network"));
composite->add_component (node.telemetry.collect_container_info ("telemetry"));
composite->add_component (collect_container_info (node.workers, "workers"));
Expand Down
40 changes: 24 additions & 16 deletions nano/node/transport/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_
{
}

nano::transport::tcp_listener::~tcp_listener ()
{
}

void nano::transport::tcp_listener::start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
on = true;

acceptor.open (local.protocol ());
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
boost::system::error_code ec;
Expand All @@ -68,12 +72,14 @@ void nano::transport::tcp_listener::stop ()
decltype (connections) connections_l;
{
nano::lock_guard<nano::mutex> lock{ mutex };
on = false;
stopped = true;
connections_l.swap (connections);
}

nano::lock_guard<nano::mutex> lock{ mutex };
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this ()] () {
boost::asio::dispatch (strand, [this_l = shared_from_this ()] () {
this_l->acceptor.close ();

for (auto & address_connection_pair : this_l->connections_per_address)
{
if (auto connection_l = address_connection_pair.second.lock ())
Expand All @@ -82,15 +88,25 @@ void nano::transport::tcp_listener::stop ()
}
}
this_l->connections_per_address.clear ();
}));
});
}

std::size_t nano::transport::tcp_listener::connection_count ()
{
nano::lock_guard<nano::mutex> lock{ mutex };
cleanup ();
return connections.size ();
}

void nano::transport::tcp_listener::cleanup ()
{
debug_assert (!mutex.try_lock ());

erase_if (connections, [] (auto const & connection) {
return connection.second.expired ();
});
}

bool nano::transport::tcp_listener::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr<nano::transport::socket> const & new_connection)
{
debug_assert (strand.running_in_this_thread ());
Expand Down Expand Up @@ -253,10 +269,10 @@ void nano::transport::tcp_listener::accept_action (boost::system::error_code con
}
}

boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint ()
boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
if (on)
if (!stopped)
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port ());
}
Expand All @@ -266,11 +282,10 @@ boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint ()
}
}

std::unique_ptr<nano::container_info_component> nano::transport::collect_container_info (nano::transport::tcp_listener & bootstrap_listener, std::string const & name)
std::unique_ptr<nano::container_info_component> nano::transport::tcp_listener::collect_container_info (std::string const & name)
{
auto sizeof_element = sizeof (decltype (bootstrap_listener.connections)::value_type);
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "connections", bootstrap_listener.connection_count (), sizeof_element }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) }));
return composite;
}

Expand Down Expand Up @@ -321,9 +336,6 @@ nano::transport::tcp_server::~tcp_server ()
}

stop ();

nano::lock_guard<nano::mutex> lock{ node->tcp_listener->mutex };
node->tcp_listener->connections.erase (this);
}

void nano::transport::tcp_server::start ()
Expand Down Expand Up @@ -877,10 +889,6 @@ void nano::transport::tcp_server::timeout ()
{
node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", fmt::streamed (remote_endpoint));

{
nano::lock_guard<nano::mutex> lock{ node->tcp_listener->mutex };
node->tcp_listener->connections.erase (this);
}
socket->close ();
}
}
Expand Down
42 changes: 27 additions & 15 deletions nano/node/transport/tcp_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,47 @@ class tcp_server;
class tcp_listener final : public std::enable_shared_from_this<nano::transport::tcp_listener>
{
public:
tcp_listener (uint16_t, nano::node &, std::size_t);
void start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a);
tcp_listener (uint16_t port, nano::node &, std::size_t max_inbound_connections);
~tcp_listener ();

void start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback);
void stop ();

void accept_action (boost::system::error_code const &, std::shared_ptr<nano::transport::socket> const &);

std::size_t connection_count ();
nano::tcp_endpoint endpoint () const;

nano::mutex mutex;
std::unordered_map<nano::transport::tcp_server *, std::weak_ptr<nano::transport::tcp_server>> connections;
nano::tcp_endpoint endpoint ();
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);

private: // Dependencies
nano::node & node;
bool on{ false };
std::atomic<std::size_t> bootstrap_count{ 0 };
std::atomic<std::size_t> realtime_count{ 0 };

private:
boost::asio::strand<boost::asio::io_context::executor_type> strand;
nano::transport::address_socket_mmap connections_per_address;
boost::asio::ip::tcp::acceptor acceptor;
boost::asio::ip::tcp::endpoint local;
std::size_t max_inbound_connections;
void on_connection (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a);
void evict_dead_connections ();
void on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const &)>);
/** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */
bool limit_reached_for_incoming_ip_connections (std::shared_ptr<nano::transport::socket> const & new_connection);
bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr<nano::transport::socket> const & new_connection);
};
void cleanup ();

std::unique_ptr<container_info_component> collect_container_info (tcp_listener & bootstrap_listener, std::string const & name);
public:
std::atomic<std::size_t> bootstrap_count{ 0 };
std::atomic<std::size_t> realtime_count{ 0 };

private:
std::unordered_map<nano::transport::tcp_server *, std::weak_ptr<nano::transport::tcp_server>> connections;
std::multimap<boost::asio::ip::address, std::weak_ptr<socket>> connections_per_address;

boost::asio::strand<boost::asio::io_context::executor_type> strand;
boost::asio::ip::tcp::acceptor acceptor;
boost::asio::ip::tcp::endpoint local;
std::size_t const max_inbound_connections;

std::atomic<bool> stopped;
mutable nano::mutex mutex;
};

class tcp_server final : public std::enable_shared_from_this<tcp_server>
{
Expand Down

0 comments on commit 3dc352e

Please sign in to comment.