Skip to content

Commit

Permalink
PURGE REWORK
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 12, 2024
1 parent 2787046 commit c5679d8
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 66 deletions.
1 change: 1 addition & 0 deletions nano/lib/logging_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ enum class type
tcp,
tcp_server,
tcp_listener,
tcp_channels,
prunning,
conf_processor_bounded,
conf_processor_unbounded,
Expand Down
1 change: 1 addition & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ enum class type : uint8_t
http_callback,
ipc,
tcp,
tcp_channels,
channel,
socket,
confirmation_height,
Expand Down
1 change: 1 addition & 0 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ nano::endpoint nano::network::endpoint () const
void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutoff_a)
{
tcp_channels.purge (cutoff_a);

if (node.network.empty ())
{
disconnect_observer ();
Expand Down
76 changes: 42 additions & 34 deletions nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const
buffer_a, [endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr<nano::node> (node.shared ()), callback_a] (boost::system::error_code const & ec, std::size_t size_a) {
if (auto node_l = node.lock ())
{
if (!ec)
{
node_l->network.tcp_channels.update (endpoint_a);
}
if (ec == boost::system::errc::host_unreachable)
{
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out);
Expand Down Expand Up @@ -332,7 +328,7 @@ nano::tcp_endpoint nano::transport::tcp_channels::bootstrap_peer ()
if (i->channel->get_network_version () >= node.network_params.network.protocol_version_min)
{
result = nano::transport::map_endpoint_to_tcp (i->channel->get_peering_endpoint ());
channels.get<last_bootstrap_attempt_tag> ().modify (i, [] (channel_tcp_wrapper & wrapper_a) {
channels.get<last_bootstrap_attempt_tag> ().modify (i, [] (channel_entry & wrapper_a) {
wrapper_a.channel->set_last_bootstrap_attempt (std::chrono::steady_clock::now ());
});
i = n;
Expand Down Expand Up @@ -425,44 +421,68 @@ std::unique_ptr<nano::container_info_component> nano::transport::tcp_channels::c
return composite;
}

void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point const & cutoff_a)
void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point const & cutoff_deadline)
{
nano::lock_guard<nano::mutex> lock{ mutex };

// Remove channels with dead underlying sockets
erase_if (channels, [] (auto const & entry) {
return !entry.channel->alive ();
});
node.logger.debug (nano::log::type::tcp_channels, "Performing periodic channel cleanup...");

auto disconnect_cutoff (channels.get<last_packet_sent_tag> ().lower_bound (cutoff_a));
channels.get<last_packet_sent_tag> ().erase (channels.get<last_packet_sent_tag> ().begin (), disconnect_cutoff);
erase_if (channels, [this, cutoff_deadline] (auto const & entry) {
// Remove channels with dead underlying sockets
if (!entry.channel->alive ())
{
node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", entry.channel->to_string ());
return true; // Erase
}
// Remove channels that haven't sent a message within the cutoff time
if (entry.channel->get_last_packet_sent () < cutoff_deadline)
{
node.logger.debug (nano::log::type::tcp_channels, "Removing idle channel: {} (idle for {} seconds)",
entry.channel->to_string (),
std::chrono::duration_cast<std::chrono::seconds> (std::chrono::steady_clock::now () - entry.channel->get_last_packet_sent ()).count ());
return true; // Erase
}
// Check if any tcp channels belonging to old protocol versions which may still be alive due to async operations
if (entry.channel->get_network_version () < node.network_params.network.protocol_version_min)
{
node.logger.debug (nano::log::type::tcp_channels, "Removing channel with old protocol version: {}", entry.channel->to_string ());
return true; // Erase
}
return false;
});

// Remove keepalive attempt tracking for attempts older than cutoff
auto attempts_cutoff (attempts.get<last_attempt_tag> ().lower_bound (cutoff_a));
auto attempts_cutoff (attempts.get<last_attempt_tag> ().lower_bound (cutoff_deadline));
attempts.get<last_attempt_tag> ().erase (attempts.get<last_attempt_tag> ().begin (), attempts_cutoff);

// Check if any tcp channels belonging to old protocol versions which may still be alive due to async operations
auto lower_bound = channels.get<version_tag> ().lower_bound (node.network_params.network.protocol_version_min);
channels.get<version_tag> ().erase (channels.get<version_tag> ().begin (), lower_bound);
}

void nano::transport::tcp_channels::ongoing_keepalive ()
{
nano::keepalive message{ node.network_params.network };
node.network.random_fill (message.peers);

nano::unique_lock<nano::mutex> lock{ mutex };
// Wake up channels

auto const keepalive_sent_cutoff = std::chrono::steady_clock::now () - node.network_params.network.keepalive_period;

std::vector<std::shared_ptr<nano::transport::channel_tcp>> send_list;
auto keepalive_sent_cutoff (channels.get<last_packet_sent_tag> ().lower_bound (std::chrono::steady_clock::now () - node.network_params.network.keepalive_period));
for (auto i (channels.get<last_packet_sent_tag> ().begin ()); i != keepalive_sent_cutoff; ++i)
for (auto & entry : channels)
{
send_list.push_back (i->channel);
if (entry.last_keepalive_sent < keepalive_sent_cutoff)
{
entry.last_keepalive_sent = std::chrono::steady_clock::now ();
send_list.push_back (entry.channel);
}
}

lock.unlock ();

for (auto & channel : send_list)
{
node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::keepalive, nano::stat::dir::out);
channel->send (message);
}

std::weak_ptr<nano::node> node_w (node.shared ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.keepalive_period, [node_w] () {
if (auto node_l = node_w.lock ())
Expand Down Expand Up @@ -562,24 +582,12 @@ void nano::transport::tcp_channels::modify (std::shared_ptr<nano::transport::cha
auto existing (channels.get<endpoint_tag> ().find (channel_a->get_tcp_endpoint ()));
if (existing != channels.get<endpoint_tag> ().end ())
{
channels.get<endpoint_tag> ().modify (existing, [modify_callback = std::move (modify_callback_a)] (channel_tcp_wrapper & wrapper_a) {
channels.get<endpoint_tag> ().modify (existing, [modify_callback = std::move (modify_callback_a)] (channel_entry & wrapper_a) {
modify_callback (wrapper_a.channel);
});
}
}

void nano::transport::tcp_channels::update (nano::tcp_endpoint const & endpoint_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto existing (channels.get<endpoint_tag> ().find (endpoint_a));
if (existing != channels.get<endpoint_tag> ().end ())
{
channels.get<endpoint_tag> ().modify (existing, [] (channel_tcp_wrapper & wrapper_a) {
wrapper_a.channel->set_last_packet_sent (std::chrono::steady_clock::now ());
});
}
}

void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a)
{
auto socket = std::make_shared<nano::transport::socket> (node);
Expand Down
52 changes: 20 additions & 32 deletions nano/node/transport/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ namespace transport
void ongoing_merge (size_t channel_index, nano::keepalive keepalive, size_t peer_index);
void list (std::deque<std::shared_ptr<nano::transport::channel>> &, uint8_t = 0, bool = true);
void modify (std::shared_ptr<nano::transport::channel_tcp> const &, std::function<void (std::shared_ptr<nano::transport::channel_tcp> const &)>);
void update (nano::tcp_endpoint const &);

// Connection start
void start_tcp (nano::endpoint const &);
Expand All @@ -147,32 +146,30 @@ namespace transport
class ip_address_tag {};
class subnetwork_tag {};
class random_access_tag {};
class last_packet_sent_tag {};
class last_bootstrap_attempt_tag {};
class last_attempt_tag {};
class node_id_tag {};
class version_tag {};
// clang-format on

class channel_tcp_wrapper final
class channel_entry final
{
public:
std::shared_ptr<nano::transport::channel_tcp> channel;
std::shared_ptr<nano::transport::socket> socket;
std::shared_ptr<nano::transport::tcp_server> response_server;

channel_tcp_wrapper (std::shared_ptr<nano::transport::channel_tcp> channel_a, std::shared_ptr<nano::transport::socket> socket_a, std::shared_ptr<nano::transport::tcp_server> server_a) :
// Field not used for indexing
mutable std::chrono::steady_clock::time_point last_keepalive_sent{ std::chrono::steady_clock::time_point () };

channel_entry (std::shared_ptr<nano::transport::channel_tcp> channel_a, std::shared_ptr<nano::transport::socket> socket_a, std::shared_ptr<nano::transport::tcp_server> server_a) :
channel (std::move (channel_a)), socket (std::move (socket_a)), response_server (std::move (server_a))
{
}

nano::tcp_endpoint endpoint () const
{
return channel->get_tcp_endpoint ();
}
std::chrono::steady_clock::time_point last_packet_sent () const
{
return channel->get_last_packet_sent ();
}
std::chrono::steady_clock::time_point last_bootstrap_attempt () const
{
return channel->get_last_bootstrap_attempt ();
Expand All @@ -187,24 +184,19 @@ namespace transport
}
nano::account node_id () const
{
auto node_id (channel->get_node_id ());
return node_id;
}
uint8_t network_version () const
{
return channel->get_network_version ();
return channel->get_node_id ();
}
};

class tcp_endpoint_attempt final
class attempt_entry final
{
public:
nano::tcp_endpoint endpoint;
boost::asio::ip::address address;
boost::asio::ip::address subnetwork;
std::chrono::steady_clock::time_point last_attempt{ std::chrono::steady_clock::now () };

explicit tcp_endpoint_attempt (nano::tcp_endpoint const & endpoint_a) :
explicit attempt_entry (nano::tcp_endpoint const & endpoint_a) :
endpoint (endpoint_a),
address (nano::transport::ipv4_address_or_ipv6_subnet (endpoint_a.address ())),
subnetwork (nano::transport::map_address_to_subnetwork (endpoint_a.address ()))
Expand All @@ -215,35 +207,31 @@ namespace transport
mutable nano::mutex mutex;

// clang-format off
boost::multi_index_container<channel_tcp_wrapper,
boost::multi_index_container<channel_entry,
mi::indexed_by<
mi::random_access<mi::tag<random_access_tag>>, // TODO: Can this be replaced with sequential access?
mi::ordered_non_unique<mi::tag<last_bootstrap_attempt_tag>,
mi::const_mem_fun<channel_tcp_wrapper, std::chrono::steady_clock::time_point, &channel_tcp_wrapper::last_bootstrap_attempt>>,
mi::const_mem_fun<channel_entry, std::chrono::steady_clock::time_point, &channel_entry::last_bootstrap_attempt>>,
mi::hashed_unique<mi::tag<endpoint_tag>,
mi::const_mem_fun<channel_tcp_wrapper, nano::tcp_endpoint, &channel_tcp_wrapper::endpoint>>,
mi::const_mem_fun<channel_entry, nano::tcp_endpoint, &channel_entry::endpoint>>,
mi::hashed_non_unique<mi::tag<node_id_tag>,
mi::const_mem_fun<channel_tcp_wrapper, nano::account, &channel_tcp_wrapper::node_id>>,
mi::ordered_non_unique<mi::tag<last_packet_sent_tag>,
mi::const_mem_fun<channel_tcp_wrapper, std::chrono::steady_clock::time_point, &channel_tcp_wrapper::last_packet_sent>>,
mi::ordered_non_unique<mi::tag<version_tag>,
mi::const_mem_fun<channel_tcp_wrapper, uint8_t, &channel_tcp_wrapper::network_version>>,
mi::const_mem_fun<channel_entry, nano::account, &channel_entry::node_id>>,
mi::hashed_non_unique<mi::tag<ip_address_tag>,
mi::const_mem_fun<channel_tcp_wrapper, boost::asio::ip::address, &channel_tcp_wrapper::ip_address>>,
mi::const_mem_fun<channel_entry, boost::asio::ip::address, &channel_entry::ip_address>>,
mi::hashed_non_unique<mi::tag<subnetwork_tag>,
mi::const_mem_fun<channel_tcp_wrapper, boost::asio::ip::address, &channel_tcp_wrapper::subnetwork>>>>
mi::const_mem_fun<channel_entry, boost::asio::ip::address, &channel_entry::subnetwork>>>>
channels;

boost::multi_index_container<tcp_endpoint_attempt,
boost::multi_index_container<attempt_entry,
mi::indexed_by<
mi::hashed_unique<mi::tag<endpoint_tag>,
mi::member<tcp_endpoint_attempt, nano::tcp_endpoint, &tcp_endpoint_attempt::endpoint>>,
mi::member<attempt_entry, nano::tcp_endpoint, &attempt_entry::endpoint>>,
mi::hashed_non_unique<mi::tag<ip_address_tag>,
mi::member<tcp_endpoint_attempt, boost::asio::ip::address, &tcp_endpoint_attempt::address>>,
mi::member<attempt_entry, boost::asio::ip::address, &attempt_entry::address>>,
mi::hashed_non_unique<mi::tag<subnetwork_tag>,
mi::member<tcp_endpoint_attempt, boost::asio::ip::address, &tcp_endpoint_attempt::subnetwork>>,
mi::member<attempt_entry, boost::asio::ip::address, &attempt_entry::subnetwork>>,
mi::ordered_non_unique<mi::tag<last_attempt_tag>,
mi::member<tcp_endpoint_attempt, std::chrono::steady_clock::time_point, &tcp_endpoint_attempt::last_attempt>>>>
mi::member<attempt_entry, std::chrono::steady_clock::time_point, &attempt_entry::last_attempt>>>>
attempts;
// clang-format on

Expand Down

0 comments on commit c5679d8

Please sign in to comment.