Skip to content

Commit

Permalink
Dedicated thread for tcp keepalives
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 14, 2024
1 parent 6ec4ed5 commit 6e990fb
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 17 deletions.
1 change: 1 addition & 0 deletions nano/lib/logging_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ enum class type
tcp,
tcp_server,
tcp_listener,
tcp_channels,
prunning,
conf_processor_bounded,
conf_processor_unbounded,
Expand Down
1 change: 1 addition & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ enum class type : uint8_t
http_callback,
ipc,
tcp,
tcp_channels,
channel,
socket,
confirmation_height,
Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::network_keepalive:
thread_role_name_string = "Net keepalive";
break;
case nano::thread_role::name::tcp_keepalive:
thread_role_name_string = "Tcp keepalive";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ enum class name
rep_tiers,
network_cleanup,
network_keepalive,
tcp_keepalive,
};

/*
Expand Down
64 changes: 49 additions & 15 deletions nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,34 @@ nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function<vo
{
}

nano::transport::tcp_channels::~tcp_channels ()
{
// All threads must be stopped before destruction
debug_assert (!keepalive_thread.joinable ());
}

void nano::transport::tcp_channels::start ()
{
ongoing_keepalive ();
ongoing_merge (0);

keepalive_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::tcp_keepalive);
run_keepalive ();
});
}

void nano::transport::tcp_channels::stop ()
{
stopped = true;
nano::unique_lock<nano::mutex> lock{ mutex };
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
condition.notify_all ();

if (keepalive_thread.joinable ())
{
keepalive_thread.join ();
}

message_manager.stop ();

Expand All @@ -160,6 +178,26 @@ void nano::transport::tcp_channels::stop ()
channels.clear ();
}

// TODO: Merge with keepalive in network class
void nano::transport::tcp_channels::run_keepalive ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait_for (lock, node.network_params.network.keepalive_period);
if (stopped)
{
return;
}
lock.unlock ();

node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::loop_keepalive);
keepalive ();

lock.lock ();
}
}

bool nano::transport::tcp_channels::insert (std::shared_ptr<nano::transport::channel_tcp> const & channel_a, std::shared_ptr<nano::transport::socket> const & socket_a, std::shared_ptr<nano::transport::tcp_server> const & server_a)
{
auto endpoint (channel_a->get_tcp_endpoint ());
Expand Down Expand Up @@ -485,33 +523,29 @@ void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point
channels.get<version_tag> ().erase (channels.get<version_tag> ().begin (), lower_bound);
}

void nano::transport::tcp_channels::ongoing_keepalive ()
void nano::transport::tcp_channels::keepalive ()
{
nano::keepalive message{ node.network_params.network };
node.network.random_fill (message.peers);

nano::unique_lock<nano::mutex> lock{ mutex };

auto const cutoff_time = std::chrono::steady_clock::now () - node.network_params.network.keepalive_period;

// Wake up channels
std::vector<std::shared_ptr<nano::transport::channel_tcp>> send_list;
auto keepalive_sent_cutoff (channels.get<last_packet_sent_tag> ().lower_bound (std::chrono::steady_clock::now () - node.network_params.network.keepalive_period));
auto keepalive_sent_cutoff (channels.get<last_packet_sent_tag> ().lower_bound (cutoff_time));
for (auto i (channels.get<last_packet_sent_tag> ().begin ()); i != keepalive_sent_cutoff; ++i)
{
send_list.push_back (i->channel);
}

lock.unlock ();

for (auto & channel : send_list)
{
channel->send (message);
}
std::weak_ptr<nano::node> node_w (node.shared ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.keepalive_period, [node_w] () {
if (auto node_l = node_w.lock ())
{
if (!node_l->network.tcp_channels.stopped)
{
node_l->network.tcp_channels.ongoing_keepalive ();
}
}
});
}

void nano::transport::tcp_channels::ongoing_merge (size_t channel_index)
Expand Down
11 changes: 9 additions & 2 deletions nano/node/transport/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index_container.hpp>

#include <thread>
#include <unordered_set>

namespace mi = boost::multi_index;
Expand Down Expand Up @@ -124,9 +125,11 @@ namespace transport
{
friend class nano::transport::channel_tcp;
friend class telemetry_simultaneous_requests_Test;
friend class network_peer_max_tcp_attempts_subnetwork_Test;

public:
explicit tcp_channels (nano::node &, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink = nullptr);
~tcp_channels ();

void start ();
void stop ();
Expand Down Expand Up @@ -164,6 +167,10 @@ namespace transport
private: // Dependencies
nano::node & node;

private:
void run_keepalive ();
void keepalive ();

public:
nano::tcp_message_manager message_manager;

Expand Down Expand Up @@ -277,9 +284,9 @@ namespace transport
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink;

std::atomic<bool> stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex;

friend class network_peer_max_tcp_attempts_subnetwork_Test;
std::thread keepalive_thread;
};
} // namespace transport
} // namespace nano

0 comments on commit 6e990fb

Please sign in to comment.