Skip to content

Commit

Permalink
Rework channel purging
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 16, 2024
1 parent 653bee1 commit 06b2ad7
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 51 deletions.
2 changes: 1 addition & 1 deletion nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ TEST (network, cleanup_purge)
ASSERT_EQ (1, node1.network.size ());

node1.network.cleanup (std::chrono::steady_clock::now ());
ASSERT_EQ (0, node1.network.size ());
ASSERT_TIMELY_EQ (5s, 0, node1.network.size ());
}

TEST (network, loopback_channel)
Expand Down
1 change: 1 addition & 0 deletions nano/core_test/peer_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ TEST (peer_container, reachout)
ASSERT_TRUE (node1.network.track_reachout (outer_node2->network.endpoint ()));
// Make sure we purge old items
node1.network.cleanup (std::chrono::steady_clock::now () + std::chrono::seconds (10));
ASSERT_TIMELY (5s, node1.network.empty ());
ASSERT_FALSE (node1.network.track_reachout (outer_node2->network.endpoint ()));
}

Expand Down
4 changes: 2 additions & 2 deletions nano/core_test/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,15 +420,15 @@ TEST (socket, drop_policy)
});

auto client = std::make_shared<nano::transport::socket> (*node);
nano::transport::channel_tcp channel{ *node, client };
auto channel = std::make_shared<nano::transport::channel_tcp> (*node, client);
nano::test::counted_completion write_completion (static_cast<unsigned> (total_message_count));

client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), listener->endpoint ().port ()),
[&channel, total_message_count, node, &write_completion, &drop_policy, client] (boost::system::error_code const & ec_a) mutable {
for (int i = 0; i < total_message_count; i++)
{
std::vector<uint8_t> buff (1);
channel.send_buffer (
channel->send_buffer (
nano::shared_const_buffer (std::move (buff)), [&write_completion, client] (boost::system::error_code const & ec, size_t size_a) mutable {
client.reset ();
write_completion.increment ();
Expand Down
3 changes: 3 additions & 0 deletions nano/node/transport/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class channel
nano::transport::traffic_type = nano::transport::traffic_type::generic)
= 0;

virtual void close () = 0;

virtual std::string to_string () const = 0;
virtual nano::endpoint get_endpoint () const = 0;
virtual nano::tcp_endpoint get_tcp_endpoint () const = 0;
Expand All @@ -50,6 +52,7 @@ class channel
{
return false;
}

virtual bool alive () const
{
return true;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/fake.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace transport
return nano::transport::transport_type::fake;
}

void close ()
void close () override
{
closed = true;
}
Expand Down
5 changes: 5 additions & 0 deletions nano/node/transport/inproc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ namespace transport
return nano::transport::transport_type::loopback;
}

void close () override
{
// Can't be closed
}

private:
nano::node & destination;
nano::endpoint const endpoint;
Expand Down
79 changes: 48 additions & 31 deletions nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const
if (!socket_l->max (traffic_type) || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket_l->full (traffic_type)))
{
socket_l->async_write (
buffer_a, [endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr<nano::node> (node.shared ()), callback_a] (boost::system::error_code const & ec, std::size_t size_a) {
buffer_a, [this_s = shared_from_this (), endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr<nano::node>{ node.shared () }, callback_a] (boost::system::error_code const & ec, std::size_t size_a) {
if (auto node_l = node.lock ())
{
if (!ec)
{
node_l->network.tcp_channels.update (endpoint_a);
this_s->set_last_packet_sent (std::chrono::steady_clock::now ());
}
if (ec == boost::system::errc::host_unreachable)
{
Expand Down Expand Up @@ -475,25 +475,52 @@ std::unique_ptr<nano::container_info_component> nano::transport::tcp_channels::c
return composite;
}

void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point const & cutoff_a)
void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point cutoff_deadline)
{
nano::lock_guard<nano::mutex> lock{ mutex };

// Remove channels with dead underlying sockets
erase_if (channels, [] (auto const & entry) {
return !entry.channel->alive ();
});
node.logger.debug (nano::log::type::tcp_channels, "Performing periodic channel cleanup, cutoff: {}s", nano::log::seconds_delta (cutoff_deadline));

auto should_close = [this, cutoff_deadline] (auto const & channel) {
// Remove channels that haven't successfully sent a message within the cutoff time
if (channel->get_last_packet_sent () < cutoff_deadline)
{
node.logger.debug (nano::log::type::tcp_channels, "Closing idle channel: {} (idle for {} seconds)",
channel->to_string (),
nano::log::seconds (std::chrono::steady_clock::now () - channel->get_last_packet_sent ()));

return true; // Close
}
// Check if any tcp channels belonging to old protocol versions which may still be alive due to async operations
if (channel->get_network_version () < node.network_params.network.protocol_version_min)
{
node.logger.debug (nano::log::type::tcp_channels, "Closing channel with old protocol version: {}", channel->to_string ());

return true; // Close
}
return false;
};

auto disconnect_cutoff (channels.get<last_packet_sent_tag> ().lower_bound (cutoff_a));
channels.get<last_packet_sent_tag> ().erase (channels.get<last_packet_sent_tag> ().begin (), disconnect_cutoff);
for (auto const & entry : channels)
{
if (should_close (entry.channel))
{
entry.channel->close ();
}
}

erase_if (channels, [this] (auto const & entry) {
if (!entry.channel->alive ())
{
node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", entry.channel->to_string ());
return true; // Erase
}
return false;
});

// Remove keepalive attempt tracking for attempts older than cutoff
auto attempts_cutoff (attempts.get<last_attempt_tag> ().lower_bound (cutoff_a));
auto attempts_cutoff (attempts.get<last_attempt_tag> ().lower_bound (cutoff_deadline));
attempts.get<last_attempt_tag> ().erase (attempts.get<last_attempt_tag> ().begin (), attempts_cutoff);

// Check if any tcp channels belonging to old protocol versions which may still be alive due to async operations
auto lower_bound = channels.get<version_tag> ().lower_bound (node.network_params.network.protocol_version_min);
channels.get<version_tag> ().erase (channels.get<version_tag> ().begin (), lower_bound);
}

void nano::transport::tcp_channels::keepalive ()
Expand All @@ -506,16 +533,18 @@ void nano::transport::tcp_channels::keepalive ()
auto const cutoff_time = std::chrono::steady_clock::now () - node.network_params.network.keepalive_period;

// Wake up channels
std::vector<std::shared_ptr<nano::transport::channel_tcp>> send_list;
auto keepalive_sent_cutoff (channels.get<last_packet_sent_tag> ().lower_bound (cutoff_time));
for (auto i (channels.get<last_packet_sent_tag> ().begin ()); i != keepalive_sent_cutoff; ++i)
std::vector<std::shared_ptr<nano::transport::channel_tcp>> to_wakeup;
for (auto const & entry : channels)
{
send_list.push_back (i->channel);
if (entry.channel->get_last_packet_sent () < cutoff_time)
{
to_wakeup.push_back (entry.channel);
}
}

lock.unlock ();

for (auto & channel : send_list)
for (auto & channel : to_wakeup)
{
channel->send (message);
}
Expand Down Expand Up @@ -563,18 +592,6 @@ void nano::transport::tcp_channels::modify (std::shared_ptr<nano::transport::cha
}
}

void nano::transport::tcp_channels::update (nano::tcp_endpoint const & endpoint_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto existing (channels.get<endpoint_tag> ().find (endpoint_a));
if (existing != channels.get<endpoint_tag> ().end ())
{
channels.get<endpoint_tag> ().modify (existing, [] (channel_entry & wrapper_a) {
wrapper_a.channel->set_last_packet_sent (std::chrono::steady_clock::now ());
});
}
}

void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a)
{
auto socket = std::make_shared<nano::transport::socket> (node);
Expand Down
35 changes: 19 additions & 16 deletions nano/node/transport/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace transport
class tcp_server;
class tcp_channels;

class channel_tcp : public nano::transport::channel
class channel_tcp : public nano::transport::channel, public std::enable_shared_from_this<channel_tcp>
{
friend class nano::transport::tcp_channels;

Expand All @@ -74,10 +74,6 @@ namespace transport
{
return &node == &other_a.node && socket.lock () == other_a.socket.lock ();
}
std::weak_ptr<nano::transport::socket> socket;
/* Mark for temporary channels. Usually remote ports of these channels are ephemeral and received from incoming connections to server.
If remote part has open listening port, temporary channel will be replaced with direct connection to listening port soon. But if other side is behing NAT or firewall this connection can be pemanent. */
std::atomic<bool> temporary{ false };

void set_endpoint ();

Expand All @@ -97,7 +93,7 @@ namespace transport
return nano::transport::transport_type::tcp;
}

virtual bool max (nano::transport::traffic_type traffic_type) override
bool max (nano::transport::traffic_type traffic_type) override
{
bool result = true;
if (auto socket_l = socket.lock ())
Expand All @@ -107,7 +103,7 @@ namespace transport
return result;
}

virtual bool alive () const override
bool alive () const override
{
if (auto socket_l = socket.lock ())
{
Expand All @@ -116,6 +112,21 @@ namespace transport
return false;
}

void close () override
{
if (auto socket_l = socket.lock ())
{
socket_l->close ();
}
}

public:
std::weak_ptr<nano::transport::socket> socket;

/* Mark for temporary channels. Usually remote ports of these channels are ephemeral and received from incoming connections to server.
If remote part has open listening port, temporary channel will be replaced with direct connection to listening port soon. But if other side is behing NAT or firewall this connection can be pemanent. */
std::atomic<bool> temporary{ false };

private:
nano::tcp_endpoint endpoint{ boost::asio::ip::address_v6::any (), 0 };

Expand Down Expand Up @@ -154,10 +165,9 @@ namespace transport
// Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt
bool track_reachout (nano::endpoint const &);
std::unique_ptr<container_info_component> collect_container_info (std::string const &);
void purge (std::chrono::steady_clock::time_point const &);
void purge (std::chrono::steady_clock::time_point cutoff_deadline);
void list (std::deque<std::shared_ptr<nano::transport::channel>> &, uint8_t = 0, bool = true);
void modify (std::shared_ptr<nano::transport::channel_tcp> const &, std::function<void (std::shared_ptr<nano::transport::channel_tcp> const &)>);
void update (nano::tcp_endpoint const &);
void keepalive ();
std::optional<nano::keepalive> sample_keepalive ();

Expand Down Expand Up @@ -191,10 +201,6 @@ namespace transport
{
return channel->get_tcp_endpoint ();
}
std::chrono::steady_clock::time_point last_packet_sent () const
{
return channel->get_last_packet_sent ();
}
std::chrono::steady_clock::time_point last_bootstrap_attempt () const
{
return channel->get_last_bootstrap_attempt ();
Expand Down Expand Up @@ -240,7 +246,6 @@ namespace transport
class ip_address_tag {};
class subnetwork_tag {};
class random_access_tag {};
class last_packet_sent_tag {};
class last_bootstrap_attempt_tag {};
class last_attempt_tag {};
class node_id_tag {};
Expand All @@ -257,8 +262,6 @@ namespace transport
mi::const_mem_fun<channel_entry, nano::tcp_endpoint, &channel_entry::endpoint>>,
mi::hashed_non_unique<mi::tag<node_id_tag>,
mi::const_mem_fun<channel_entry, nano::account, &channel_entry::node_id>>,
mi::ordered_non_unique<mi::tag<last_packet_sent_tag>,
mi::const_mem_fun<channel_entry, std::chrono::steady_clock::time_point, &channel_entry::last_packet_sent>>,
mi::ordered_non_unique<mi::tag<version_tag>,
mi::const_mem_fun<channel_entry, uint8_t, &channel_entry::network_version>>,
mi::hashed_non_unique<mi::tag<ip_address_tag>,
Expand Down

0 comments on commit 06b2ad7

Please sign in to comment.