Skip to content

Commit

Permalink
Process connection attempts in a round-robin fashion in order to load…
Browse files Browse the repository at this point in the history
…-balance connection attempts.

# Conflicts:
#	nano/node/network.cpp
  • Loading branch information
dsiganos committed Mar 4, 2024
1 parent 522f05c commit 0ce830f
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 3 deletions.
4 changes: 4 additions & 0 deletions nano/lib/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ class network_constants
default_websocket_port (47000),
aec_loop_interval_ms (300), // Update AEC ~3 times per second
cleanup_period (default_cleanup_period),
merge_period (std::chrono::milliseconds (250)),
keepalive_period (std::chrono::seconds (15)),
idle_timeout (default_cleanup_period * 2),
silent_connection_tolerance_time (std::chrono::seconds (120)),
Expand Down Expand Up @@ -239,6 +240,7 @@ class network_constants
{
aec_loop_interval_ms = 20;
cleanup_period = std::chrono::seconds (1);
merge_period = std::chrono::milliseconds (10);
keepalive_period = std::chrono::seconds (1);
idle_timeout = cleanup_period * 15;
max_peers_per_ip = 20;
Expand Down Expand Up @@ -277,6 +279,8 @@ class network_constants
{
return cleanup_period * 5;
}
/** How often to connect to other peers */
std::chrono::milliseconds merge_period;
/** How often to send keepalive messages */
std::chrono::seconds keepalive_period;
/** Default maximum idle time for a socket before it's automatically closed */
Expand Down
2 changes: 0 additions & 2 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,6 @@ class network_message_visitor : public nano::message_visitor

void keepalive (nano::keepalive const & message_a) override
{
node.network.merge_peers (message_a.peers);

// Check for special node port data
auto peer0 (message_a.peers[0]);
if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0)
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,4 @@ struct hash<std::reference_wrapper<::nano::transport::channel const>>
return hash (channel_a.get ());
}
};
}
}
72 changes: 72 additions & 0 deletions nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
void nano::transport::tcp_channels::start ()
{
ongoing_keepalive ();
ongoing_merge (0);
}

void nano::transport::tcp_channels::stop ()
Expand Down Expand Up @@ -509,6 +510,77 @@ void nano::transport::tcp_channels::ongoing_keepalive ()
});
}

void nano::transport::tcp_channels::ongoing_merge (size_t channel_index)
{
nano::unique_lock<nano::mutex> lock{ mutex };
std::optional<nano::keepalive> keepalive;
size_t count = 0;
while (!keepalive && channels.size () > 0 && count++ < channels.size ())
{
++channel_index;
if (channels.size () <= channel_index)
{
channel_index = 0;
}
auto server = channels.get<random_access_tag> ()[channel_index].response_server;
if (server && server->last_keepalive)
{
keepalive = std::move (server->last_keepalive);
server->last_keepalive = std::nullopt;
}
}
lock.unlock ();
if (keepalive)
{
ongoing_merge (channel_index, *keepalive, 1);
}
else
{
std::weak_ptr<nano::node> node_w = node.shared ();
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index] () {
if (auto node_l = node_w.lock ())
{
if (!node_l->network.tcp_channels.stopped)
{
node_l->network.tcp_channels.ongoing_merge (channel_index);
}
}
});
}
}

void nano::transport::tcp_channels::ongoing_merge (size_t channel_index, nano::keepalive keepalive, size_t peer_index)
{
debug_assert (peer_index < keepalive.peers.size ());
node.network.merge_peer (keepalive.peers[peer_index++]);
if (peer_index < keepalive.peers.size ())
{
std::weak_ptr<nano::node> node_w = node.shared ();
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index, keepalive, peer_index] () {
if (auto node_l = node_w.lock ())
{
if (!node_l->network.tcp_channels.stopped)
{
node_l->network.tcp_channels.ongoing_merge (channel_index, keepalive, peer_index);
}
}
});
}
else
{
std::weak_ptr<nano::node> node_w = node.shared ();
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index] () {
if (auto node_l = node_w.lock ())
{
if (!node_l->network.tcp_channels.stopped)
{
node_l->network.tcp_channels.ongoing_merge (channel_index);
}
}
});
}
}

void nano::transport::tcp_channels::list (std::deque<std::shared_ptr<nano::transport::channel>> & deque_a, uint8_t minimum_version_a, bool include_temporary_channels_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
Expand Down
2 changes: 2 additions & 0 deletions nano/node/transport/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ namespace transport
std::unique_ptr<container_info_component> collect_container_info (std::string const &);
void purge (std::chrono::steady_clock::time_point const &);
void ongoing_keepalive ();
void ongoing_merge (size_t channel_index);
void ongoing_merge (size_t channel_index, nano::keepalive keepalive, size_t peer_index);
void list (std::deque<std::shared_ptr<nano::transport::channel>> &, uint8_t = 0, bool = true);
void modify (std::shared_ptr<nano::transport::channel_tcp> const &, std::function<void (std::shared_ptr<nano::transport::channel_tcp> const &)>);
void update (nano::tcp_endpoint const &);
Expand Down
10 changes: 10 additions & 0 deletions nano/node/transport/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ nano::transport::tcp_server::realtime_message_visitor::realtime_message_visitor
void nano::transport::tcp_server::realtime_message_visitor::keepalive (const nano::keepalive & message)
{
process = true;
server.set_last_keepalive (message);
}

void nano::transport::tcp_server::realtime_message_visitor::publish (const nano::publish & message)
Expand Down Expand Up @@ -786,6 +787,15 @@ void nano::transport::tcp_server::timeout ()
}
}

void nano::transport::tcp_server::set_last_keepalive (nano::keepalive const & message)
{
std::lock_guard<nano::mutex> lock{ mutex };
if (!last_keepalive)
{
last_keepalive = message;
}
}

bool nano::transport::tcp_server::to_bootstrap_connection ()
{
auto node = this->node.lock ();
Expand Down
2 changes: 2 additions & 0 deletions nano/node/transport/tcp_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class tcp_server final : public std::enable_shared_from_this<tcp_server>
void stop ();

void timeout ();
void set_last_keepalive (nano::keepalive const & message);

std::shared_ptr<nano::transport::socket> const socket;
std::weak_ptr<nano::node> const node;
Expand All @@ -72,6 +73,7 @@ class tcp_server final : public std::enable_shared_from_this<tcp_server>
nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 };
nano::account remote_node_id{};
std::chrono::steady_clock::time_point last_telemetry_req{};
std::optional<nano::keepalive> last_keepalive;

private:
void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2);
Expand Down

0 comments on commit 0ce830f

Please sign in to comment.