Skip to content

Commit

Permalink
Rework tcp_server handshake visitor
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 19, 2024
1 parent 2f69c60 commit 18798f9
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 57 deletions.
9 changes: 9 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,15 @@ enum class detail : uint8_t
tcp_read_error,
tcp_write_error,

// tcp_server
handshake,
handshake_abort,
handshake_error,
handshake_network_error,
handshake_initiate,
handshake_response,
handshake_response_invalid,

// ipc
invocations,

Expand Down
194 changes: 140 additions & 54 deletions nano/node/transport/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ nano::transport::tcp_server::~tcp_server ()
return;
}

node->logger.debug (nano::log::type::tcp_server, "Exiting TCP server ({})", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Exiting TCP server ({})", fmt::streamed (remote_endpoint));

if (socket->type () == nano::transport::socket::type_t::bootstrap)
{
Expand Down Expand Up @@ -341,7 +341,7 @@ void nano::transport::tcp_server::start ()
return;
}

node->logger.debug (nano::log::type::tcp_server, "Starting TCP server ({})", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Starting TCP server ({})", fmt::streamed (remote_endpoint));

receive_message ();
}
Expand Down Expand Up @@ -374,7 +374,7 @@ void nano::transport::tcp_server::receive_message ()
node->logger.debug (nano::log::type::tcp_server, "Error reading message: {}, status: {} ({})",
ec.message (),
to_string (this_l->message_deserializer->status),
nano::util::to_str (this_l->remote_endpoint));
fmt::streamed (this_l->remote_endpoint));

this_l->stop ();
}
Expand Down Expand Up @@ -412,14 +412,18 @@ void nano::transport::tcp_server::received_message (std::unique_ptr<nano::messag
// Avoid too much noise about `duplicate_publish_message` errors
node->logger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})",
to_string (message_deserializer->status),
nano::util::to_str (remote_endpoint));
fmt::streamed (remote_endpoint));
}
}

if (should_continue)
{
receive_message ();
}
else
{
stop ();
}
}

bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message> message)
Expand All @@ -429,6 +433,7 @@ bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message
{
return false;
}

node->stats.inc (nano::stat::type::tcp_server, to_stat_detail (message->header.type), nano::stat::dir::in);

debug_assert (is_undefined_connection () || is_realtime_connection () || is_bootstrap_connection ());
Expand All @@ -448,27 +453,39 @@ bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message
{
handshake_message_visitor handshake_visitor{ shared_from_this () };
message->visit (handshake_visitor);
if (handshake_visitor.process)
{
queue_realtime (std::move (message));
return true;
}
else if (handshake_visitor.bootstrap)

switch (handshake_visitor.result)
{
if (!to_bootstrap_connection ())
case handshake_message_visitor::status::abort:
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_abort);
node->logger.debug (nano::log::type::tcp_server, "Aborting handshake: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint));

return false; // Stop receiving new messages
}
case handshake_message_visitor::status::progress:
{
stop ();
return false;
return true; // Continue handshake
}
}
else
{
// Neither handshake nor bootstrap received when in handshake mode
node->logger.debug (nano::log::type::tcp_server, "Neither handshake nor bootstrap received when in handshake mode: {} ({})",
nano::to_string (message->header.type),
nano::util::to_str (remote_endpoint));
case handshake_message_visitor::status::realtime:
{
queue_realtime (std::move (message));
return true; // Continue receiving new messages
}
case handshake_message_visitor::status::bootstrap:
{
if (!to_bootstrap_connection ())
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Error switching to bootstrap mode: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint));

return true;
return false; // Switch failed, stop receiving new messages
}
else
{
// Fall through to process the bootstrap message
}
}
}
}
else if (is_realtime_connection ())
Expand All @@ -481,7 +498,7 @@ bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message
}
return true;
}
// the server will switch to bootstrap mode immediately after processing the first bootstrap message, thus no `else if`
// The server will switch to bootstrap mode immediately after processing the first bootstrap message, thus no `else if`
if (is_bootstrap_connection ())
{
bootstrap_message_visitor bootstrap_visitor{ shared_from_this () };
Expand Down Expand Up @@ -518,47 +535,107 @@ void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (
{
return;
}

if (node->flags.disable_tcp_realtime)
{
node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", nano::util::to_str (server->remote_endpoint));
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", fmt::streamed (server->remote_endpoint));

// Stop invalid handshake
server->stop ();
return;
result = status::abort;
return; // Abort
}
if (!message.query && !message.response)
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", fmt::streamed (server->remote_endpoint));

if (message.query && server->handshake_query_received)
result = status::abort;
return; // Abort
}
if (message.query && server->handshake_received) // Second handshake message should be a response only
{
node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", nano::util::to_str (server->remote_endpoint));
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", fmt::streamed (server->remote_endpoint));

// Stop invalid handshake
server->stop ();
return;
result = status::abort;
return; // Abort
}

server->handshake_query_received = true;
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in);

server->handshake_received = true;

node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", nano::util::to_str (server->remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", fmt::streamed (server->remote_endpoint));

if (message.query)
{
// Sends response + our own query
server->send_handshake_response (*message.query, message.is_v2 ());
result = status::progress; // Continue handshake
}
if (message.response)
{
if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (server->remote_endpoint)))
{
server->to_realtime_connection (message.response->node_id);
bool success = server->to_realtime_connection (message.response->node_id);
if (success)
{
result = status::realtime; // Switched to realtime
}
else
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", fmt::streamed (server->remote_endpoint));

result = status::abort;
return;
}
}
else
{
// Stop invalid handshake
server->stop ();
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response_invalid);
node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", fmt::streamed (server->remote_endpoint));

result = status::abort;
return;
}
}
}

void nano::transport::tcp_server::initiate_handshake ()
{
auto node = this->node.lock ();
if (!node)
{
return;
}

process = true;
auto query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint));
nano::node_id_handshake message{ node->network_params.network, query };

node->logger.debug (nano::log::type::tcp_server, "Initiating handshake query ({})", nano::util::to_str (remote_endpoint));

auto shared_const_buffer = message.to_shared_const_buffer ();
socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) {
auto node = this_l->node.lock ();
if (!node)
{
return;
}
if (ec)
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error);
node->logger.debug (nano::log::type::tcp_server, "Error sending handshake query: {} ({})", ec.message (), fmt::streamed (this_l->remote_endpoint));

// Stop invalid handshake
this_l->stop ();
}
else
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out);
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_initiate, nano::stat::dir::out);
}
});
}

void nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2)
Expand All @@ -568,11 +645,13 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha
{
return;
}

auto response = node->network.prepare_handshake_response (query, v2);
auto own_query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint));
nano::node_id_handshake handshake_response{ node->network_params.network, own_query, response };

// TODO: Use channel
node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", nano::util::to_str (remote_endpoint));

auto shared_const_buffer = handshake_response.to_shared_const_buffer ();
socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) {
auto node = this_l->node.lock ();
Expand All @@ -582,36 +661,38 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha
}
if (ec)
{
node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), nano::util::to_str (this_l->remote_endpoint));
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error);
node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), fmt::streamed (this_l->remote_endpoint));

// Stop invalid handshake
this_l->stop ();
}
else
{
node->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out);
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out);
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response, nano::stat::dir::out);
}
});
}

void nano::transport::tcp_server::handshake_message_visitor::bulk_pull (const nano::bulk_pull & message)
{
bootstrap = true;
result = status::bootstrap;
}

void nano::transport::tcp_server::handshake_message_visitor::bulk_pull_account (const nano::bulk_pull_account & message)
{
bootstrap = true;
result = status::bootstrap;
}

void nano::transport::tcp_server::handshake_message_visitor::bulk_push (const nano::bulk_push & message)
{
bootstrap = true;
result = status::bootstrap;
}

void nano::transport::tcp_server::handshake_message_visitor::frontier_req (const nano::frontier_req & message)
{
bootstrap = true;
result = status::bootstrap;
}

/*
Expand Down Expand Up @@ -780,7 +861,7 @@ void nano::transport::tcp_server::timeout ()
}
if (socket->has_timed_out ())
{
node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", fmt::streamed (remote_endpoint));

{
nano::lock_guard<nano::mutex> lock{ node->tcp_listener->mutex };
Expand Down Expand Up @@ -834,7 +915,7 @@ bool nano::transport::tcp_server::to_bootstrap_connection ()
++node->tcp_listener->bootstrap_count;
socket->type_set (nano::transport::socket::type_t::bootstrap);

node->logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", fmt::streamed (remote_endpoint));

return true;
}
Expand All @@ -846,17 +927,22 @@ bool nano::transport::tcp_server::to_realtime_connection (nano::account const &
{
return false;
}
if (socket->type () == nano::transport::socket::type_t::undefined && !node->flags.disable_tcp_realtime)
if (node->flags.disable_tcp_realtime)
{
remote_node_id = node_id;
++node->tcp_listener->realtime_count;
socket->type_set (nano::transport::socket::type_t::realtime);
return false;
}
if (socket->type () != nano::transport::socket::type_t::undefined)
{
return false;
}

node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", nano::util::to_str (remote_endpoint));
remote_node_id = node_id;
++node->tcp_listener->realtime_count;
socket->type_set (nano::transport::socket::type_t::realtime);

return true;
}
return false;
node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", fmt::streamed (remote_endpoint));

return true;
}

bool nano::transport::tcp_server::is_undefined_connection () const
Expand Down
Loading

0 comments on commit 18798f9

Please sign in to comment.