Skip to content

Commit

Permalink
Dedicated network reachout thread
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 15, 2024
1 parent c60a768 commit ce31080
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 77 deletions.
6 changes: 5 additions & 1 deletion nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ enum class detail : uint8_t
ok,
loop,
loop_cleanup,
loop_keepalive,
total,
process,
processed,
Expand Down Expand Up @@ -217,6 +216,11 @@ enum class detail : uint8_t
message_size_too_big,
outdated_version,

// network
loop_keepalive,
loop_reachout,
merge_peer,

// tcp
tcp_accept_success,
tcp_accept_failure,
Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::network_keepalive:
thread_role_name_string = "Net keepalive";
break;
case nano::thread_role::name::network_reachout:
thread_role_name_string = "Net reachout";
break;
case nano::thread_role::name::tcp_keepalive:
thread_role_name_string = "Tcp keepalive";
break;
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ enum class name
rep_tiers,
network_cleanup,
network_keepalive,
network_reachout,
tcp_keepalive,
};

Expand Down
55 changes: 49 additions & 6 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ void nano::network::start ()
run_keepalive ();
});

reachout_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::network_reachout);
run_reachout ();
});

if (!node.flags.disable_tcp_realtime)
{
tcp_channels.start ();
Expand Down Expand Up @@ -87,6 +92,10 @@ void nano::network::stop ()
{
cleanup_thread.join ();
}
if (reachout_thread.joinable ())
{
reachout_thread.join ();
}

port = 0;
}
Expand Down Expand Up @@ -126,12 +135,11 @@ void nano::network::run_cleanup ()
while (!stopped)
{
condition.wait_for (lock, node.network_params.network.is_dev_network () ? 1s : 5s);
lock.unlock ();

if (stopped)
{
return;
}
lock.unlock ();

node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_cleanup);

Expand All @@ -154,12 +162,11 @@ void nano::network::run_keepalive ()
while (!stopped)
{
condition.wait_for (lock, node.network_params.network.keepalive_period);
lock.unlock ();

if (stopped)
{
return;
}
lock.unlock ();

node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_keepalive);

Expand All @@ -170,6 +177,41 @@ void nano::network::run_keepalive ()
}
}

void nano::network::run_reachout ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait_for (lock, node.network_params.network.merge_period);
if (stopped)
{
return;
}
lock.unlock ();

node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_reachout);

auto keepalive = tcp_channels.sample_keepalive ();
if (keepalive)
{
for (auto const & peer : keepalive->peers)
{
if (stopped)
{
return;
}

merge_peer (peer);

// Throttle reachout attempts
std::this_thread::sleep_for (node.network_params.network.merge_period);
}
}

lock.lock ();
}
}

void nano::network::send_keepalive (std::shared_ptr<nano::transport::channel> const & channel_a)
{
nano::keepalive message{ node.network_params.network };
Expand Down Expand Up @@ -413,8 +455,9 @@ void nano::network::merge_peer (nano::endpoint const & peer_a)
{
if (!track_reachout (peer_a))
{
std::weak_ptr<nano::node> node_w (node.shared ());
node.network.tcp_channels.start_tcp (peer_a);
node.stats.inc (nano::stat::type::network, nano::stat::detail::merge_peer);

tcp_channels.start_tcp (peer_a);
}
}

Expand Down
2 changes: 2 additions & 0 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class network final
void run_processing ();
void run_cleanup ();
void run_keepalive ();
void run_reachout ();
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);

private: // Dependencies
Expand All @@ -137,6 +138,7 @@ class network final
std::vector<boost::thread> processing_threads; // Using boost::thread to enable increased stack size
std::thread cleanup_thread;
std::thread keepalive_thread;
std::thread reachout_thread;

public:
static unsigned const broadcast_interval_ms = 10;
Expand Down
80 changes: 16 additions & 64 deletions nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ nano::transport::tcp_channels::~tcp_channels ()

void nano::transport::tcp_channels::start ()
{
ongoing_merge (0);

keepalive_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::tcp_keepalive);
run_keepalive ();
Expand Down Expand Up @@ -548,75 +546,29 @@ void nano::transport::tcp_channels::keepalive ()
}
}

void nano::transport::tcp_channels::ongoing_merge (size_t channel_index)
std::optional<nano::keepalive> nano::transport::tcp_channels::sample_keepalive ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
std::optional<nano::keepalive> keepalive;
size_t count = 0;
while (!keepalive && channels.size () > 0 && count++ < channels.size ())
nano::lock_guard<nano::mutex> lock{ mutex };

auto next_rand = [this] (std::size_t max) {
std::uniform_int_distribution<std::size_t> dist (0, max - 1);
return dist (rng);
};

size_t counter = 0;
while (counter++ < channels.size ())
{
++channel_index;
if (channels.size () <= channel_index)
{
channel_index = 0;
}
auto server = channels.get<random_access_tag> ()[channel_index].response_server;
if (server && server->last_keepalive)
auto index = next_rand (channels.size ());
if (auto server = channels.get<random_access_tag> ()[index].response_server)
{
keepalive = std::move (server->last_keepalive);
server->last_keepalive = std::nullopt;
}
}
lock.unlock ();
if (keepalive)
{
ongoing_merge (channel_index, *keepalive, 1);
}
else
{
std::weak_ptr<nano::node> node_w = node.shared ();
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index] () {
if (auto node_l = node_w.lock ())
if (auto keepalive = server->pop_last_keepalive ())
{
if (!node_l->network.tcp_channels.stopped)
{
node_l->network.tcp_channels.ongoing_merge (channel_index);
}
return keepalive;
}
});
}
}
}

void nano::transport::tcp_channels::ongoing_merge (size_t channel_index, nano::keepalive keepalive, size_t peer_index)
{
debug_assert (peer_index < keepalive.peers.size ());
node.network.merge_peer (keepalive.peers[peer_index++]);
if (peer_index < keepalive.peers.size ())
{
std::weak_ptr<nano::node> node_w = node.shared ();
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index, keepalive, peer_index] () {
if (auto node_l = node_w.lock ())
{
if (!node_l->network.tcp_channels.stopped)
{
node_l->network.tcp_channels.ongoing_merge (channel_index, keepalive, peer_index);
}
}
});
}
else
{
std::weak_ptr<nano::node> node_w = node.shared ();
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index] () {
if (auto node_l = node_w.lock ())
{
if (!node_l->network.tcp_channels.stopped)
{
node_l->network.tcp_channels.ongoing_merge (channel_index);
}
}
});
}
return std::nullopt;
}

void nano::transport::tcp_channels::list (std::deque<std::shared_ptr<nano::transport::channel>> & deque_a, uint8_t minimum_version_a, bool include_temporary_channels_a)
Expand Down
8 changes: 5 additions & 3 deletions nano/node/transport/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index_container.hpp>

#include <random>
#include <thread>
#include <unordered_set>

Expand Down Expand Up @@ -154,12 +155,11 @@ namespace transport
bool track_reachout (nano::endpoint const &);
std::unique_ptr<container_info_component> collect_container_info (std::string const &);
void purge (std::chrono::steady_clock::time_point const &);
void ongoing_keepalive ();
void ongoing_merge (size_t channel_index);
void ongoing_merge (size_t channel_index, nano::keepalive keepalive, size_t peer_index);
void list (std::deque<std::shared_ptr<nano::transport::channel>> &, uint8_t = 0, bool = true);
void modify (std::shared_ptr<nano::transport::channel_tcp> const &, std::function<void (std::shared_ptr<nano::transport::channel_tcp> const &)>);
void update (nano::tcp_endpoint const &);
std::optional<nano::keepalive> sample_keepalive ();

// Connection start
void start_tcp (nano::endpoint const &);
void start_tcp_receive_node_id (std::shared_ptr<nano::transport::channel_tcp> const &, nano::endpoint const &, std::shared_ptr<std::vector<uint8_t>> const &);
Expand Down Expand Up @@ -287,6 +287,8 @@ namespace transport
nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread keepalive_thread;

std::default_random_engine rng;
};
} // namespace transport
} // namespace nano
8 changes: 8 additions & 0 deletions nano/node/transport/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,14 @@ void nano::transport::tcp_server::set_last_keepalive (nano::keepalive const & me
}
}

std::optional<nano::keepalive> nano::transport::tcp_server::pop_last_keepalive ()
{
std::lock_guard<nano::mutex> lock{ mutex };
auto result = last_keepalive;
last_keepalive = std::nullopt;
return result;
}

bool nano::transport::tcp_server::to_bootstrap_connection ()
{
auto node = this->node.lock ();
Expand Down
7 changes: 4 additions & 3 deletions nano/node/transport/tcp_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class tcp_server final : public std::enable_shared_from_this<tcp_server>

void timeout ();
void set_last_keepalive (nano::keepalive const & message);
std::optional<nano::keepalive> pop_last_keepalive ();

std::shared_ptr<nano::transport::socket> const socket;
std::weak_ptr<nano::node> const node;
Expand All @@ -73,7 +74,6 @@ class tcp_server final : public std::enable_shared_from_this<tcp_server>
nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 };
nano::account remote_node_id{};
std::chrono::steady_clock::time_point last_telemetry_req{};
std::optional<nano::keepalive> last_keepalive;

private:
void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2);
Expand All @@ -90,9 +90,10 @@ class tcp_server final : public std::enable_shared_from_this<tcp_server>
bool is_bootstrap_connection () const;
bool is_realtime_connection () const;

private:
bool const allow_bootstrap;
std::shared_ptr<nano::transport::message_deserializer> message_deserializer;

bool allow_bootstrap;
std::optional<nano::keepalive> last_keepalive;

private:
class handshake_message_visitor : public nano::message_visitor
Expand Down

0 comments on commit ce31080

Please sign in to comment.