Skip to content

Commit

Permalink
TCP SERVER
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 12, 2024
1 parent f7a80f4 commit 9b8d986
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 25 deletions.
66 changes: 41 additions & 25 deletions nano/node/transport/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,13 @@ void nano::transport::tcp_server::received_message (std::unique_ptr<nano::messag
debug_assert (message_deserializer->status != transport::parse_status::success);

node->stats.inc (nano::stat::type::error, to_stat_detail (message_deserializer->status));
// Avoid too much noise about `duplicate_publish_message` errors
if (message_deserializer->status == transport::parse_status::duplicate_publish_message)
{
node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message);
}
else
{
// Avoid too much noise about `duplicate_publish_message` errors
node->logger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})",
to_string (message_deserializer->status),
nano::util::to_str (remote_endpoint));
Expand All @@ -416,14 +416,18 @@ void nano::transport::tcp_server::received_message (std::unique_ptr<nano::messag
{
receive_message ();
}
else
{
stop ();
}
}

bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message> message)
{
auto node = this->node.lock ();
if (!node)
{
return false;
return false; // Stop receiving new messages
}

debug_assert (is_undefined_connection () || is_realtime_connection () || is_bootstrap_connection ());
Expand Down Expand Up @@ -453,18 +457,26 @@ bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message
to_string (message->type ()),
nano::util::to_str (remote_endpoint));

stop ();
return false;
return false; // Abort
}
else
{
// Fall through to process the bootstrap message
}
}
else if (handshake_visitor.realtime)
{
// Handshake in progress or successfully switched to realtime mode
return true; // Continue receiving new messages
}
else
{
// Neither handshake nor bootstrap received when in handshake mode
node->logger.debug (nano::log::type::tcp_server, "Neither handshake nor bootstrap received when in handshake mode: {} ({})",
node->logger.debug (nano::log::type::tcp_server, "Neither realtime nor bootstrap could be completed when in handshake mode: {} ({})",
to_string (message->type ()),
nano::util::to_str (remote_endpoint));

return true;
return false; // Abort
}
}
else if (is_realtime_connection ())
Expand All @@ -475,7 +487,7 @@ bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message
{
queue_realtime (std::move (message));
}
return true;
return true; // Continue receiving new messages
}
// The server will switch to bootstrap mode immediately after processing the first bootstrap message, thus no `else if`
if (is_bootstrap_connection ())
Expand All @@ -484,7 +496,7 @@ bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message
message->visit (bootstrap_visitor);
return !bootstrap_visitor.processed; // Stop receiving new messages if bootstrap serving started
}
debug_assert (false);
debug_assert (false); // Should never reach here
return true; // Continue receiving new messages
}

Expand All @@ -507,6 +519,8 @@ nano::transport::tcp_server::handshake_message_visitor::handshake_message_visito

void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (nano::node_id_handshake const & message)
{
// Connection will be aborted if this returns without setting `realtime` to true

auto node = server->node.lock ();
if (!node)
{
Expand All @@ -520,19 +534,15 @@ void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", nano::util::to_str (server->remote_endpoint));

// Stop invalid handshake
server->stop ();
return;
return; // Abort
}

if (message.query && server->handshake_query_received)
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", nano::util::to_str (server->remote_endpoint));

// Stop invalid handshake
server->stop ();
return;
return; // Abort
}

server->handshake_query_received = true;
Expand All @@ -541,30 +551,33 @@ void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (

if (message.query)
{
// Sends response + our own query
server->send_handshake_response (*message.query, message.is_v2 ());
realtime = true; // Continue processing
}
if (message.response)
{
if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (server->remote_endpoint)))
{
bool success = server->to_realtime_connection (message.response->node_id);
if (!success)
if (success)
{
realtime = true; // Continue processing
}
else
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Error occurred when attempting to upgrade to realtime connection ({})", nano::util::to_str (server->remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", nano::util::to_str (server->remote_endpoint));

server->stop ();
return;
realtime = false; // Abort
}
}
else
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake_response_invalid);
node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", nano::util::to_str (server->remote_endpoint));

// Stop invalid handshake
server->stop ();
return;
realtime = false; // Abort
}
}
}
Expand All @@ -580,7 +593,7 @@ void nano::transport::tcp_server::send_handshake_query ()
auto query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint));
nano::node_id_handshake message{ node->network_params.network, query };

node->logger.debug (nano::log::type::tcp_server, "Initiating handshake query to {}", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Initiating handshake query ({})", nano::util::to_str (remote_endpoint));

// TODO: Use channel
auto shared_const_buffer = message.to_shared_const_buffer ();
Expand All @@ -593,7 +606,9 @@ void nano::transport::tcp_server::send_handshake_query ()
if (ec)
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake_network_error);
node->logger.debug (nano::log::type::tcp_server, "Error sending handshake query: {} ({})", ec.message (), nano::util::to_str (this_l->remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Error sending handshake query: {} ({})",
ec.message (),
nano::util::to_str (this_l->remote_endpoint));

// Stop invalid handshake
this_l->stop ();
Expand All @@ -618,7 +633,7 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha
auto own_query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint));
nano::node_id_handshake handshake_response{ node->network_params.network, own_query, response };

node->logger.debug (nano::log::type::tcp_server, "Responding to handshake to {}", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", nano::util::to_str (remote_endpoint));

// TODO: Use channel
auto shared_const_buffer = handshake_response.to_shared_const_buffer ();
Expand All @@ -631,7 +646,8 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha
if (ec)
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake_network_error);
node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), nano::util::to_str (this_l->remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})",
ec.message (), n ano::util::to_str (this_l->remote_endpoint));

// Stop invalid handshake
this_l->stop ();
Expand Down
1 change: 1 addition & 0 deletions nano/node/transport/tcp_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class tcp_server final : public std::enable_shared_from_this<tcp_server>
{
public:
bool bootstrap{ false };
bool realtime{ false };

explicit handshake_message_visitor (std::shared_ptr<tcp_server>);

Expand Down

0 comments on commit 9b8d986

Please sign in to comment.