Skip to content

Commit

Permalink
Simplify message visitor
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 19, 2024
1 parent 18798f9 commit 3760ca0
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 89 deletions.
2 changes: 1 addition & 1 deletion nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake)
ASSERT_FALSE (ec);
});
});
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake) != 0);
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake) != 0);
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
ASSERT_EQ (node0->tcp_listener->connections.size (), 1);
Expand Down
157 changes: 85 additions & 72 deletions nano/node/transport/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,49 +392,61 @@ void nano::transport::tcp_server::received_message (std::unique_ptr<nano::messag
{
return;
}
bool should_continue = true;

process_result result = process_result::progress;
if (message)
{
should_continue = process_message (std::move (message));
result = process_message (std::move (message));
}
else
{
// Error while deserializing message
debug_assert (message_deserializer->status != transport::parse_status::success);

node->stats.inc (nano::stat::type::error, to_stat_detail (message_deserializer->status));

// Avoid too much noise about `duplicate_publish_message` errors
if (message_deserializer->status == transport::parse_status::duplicate_publish_message)
{
node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message);
}
else
{
// Avoid too much noise about `duplicate_publish_message` errors
node->logger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})",
to_string (message_deserializer->status),
fmt::streamed (remote_endpoint));
}
}

if (should_continue)
{
receive_message ();
}
else
switch (result)
{
stop ();
case process_result::progress:
{
receive_message ();
}
break;
case process_result::abort:
{
stop ();
}
break;
case process_result::pause:
{
// Do nothing
}
break;
}
}

bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message> message)
auto nano::transport::tcp_server::process_message (std::unique_ptr<nano::message> message) -> process_result
{
auto node = this->node.lock ();
if (!node)
{
return false;
return process_result::abort;
}

node->stats.inc (nano::stat::type::tcp_server, to_stat_detail (message->header.type), nano::stat::dir::in);
node->stats.inc (nano::stat::type::tcp_server, to_stat_detail (message->type ()), nano::stat::dir::in);

debug_assert (is_undefined_connection () || is_realtime_connection () || is_bootstrap_connection ());

Expand All @@ -451,35 +463,35 @@ bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message
*/
if (is_undefined_connection ())
{
handshake_message_visitor handshake_visitor{ shared_from_this () };
handshake_message_visitor handshake_visitor{ *this };
message->visit (handshake_visitor);

switch (handshake_visitor.result)
{
case handshake_message_visitor::status::abort:
case handshake_status::abort:
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_abort);
node->logger.debug (nano::log::type::tcp_server, "Aborting handshake: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint));

return false; // Stop receiving new messages
return process_result::abort;
}
case handshake_message_visitor::status::progress:
case handshake_status::progress:
{
return true; // Continue handshake
return process_result::progress; // Continue handshake
}
case handshake_message_visitor::status::realtime:
case handshake_status::realtime:
{
queue_realtime (std::move (message));
return true; // Continue receiving new messages
return process_result::progress; // Continue receiving new messages
}
case handshake_message_visitor::status::bootstrap:
case handshake_status::bootstrap:
{
if (!to_bootstrap_connection ())
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Error switching to bootstrap mode: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint));

return false; // Switch failed, stop receiving new messages
return process_result::abort; // Switch failed, abort
}
else
{
Expand All @@ -492,21 +504,26 @@ bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message
{
realtime_message_visitor realtime_visitor{ *this };
message->visit (realtime_visitor);

if (realtime_visitor.process)
{
queue_realtime (std::move (message));
}
return true;

return process_result::progress;
}
// The server will switch to bootstrap mode immediately after processing the first bootstrap message, thus no `else if`
if (is_bootstrap_connection ())
{
bootstrap_message_visitor bootstrap_visitor{ shared_from_this () };
message->visit (bootstrap_visitor);
return !bootstrap_visitor.processed; // Stop receiving new messages if bootstrap serving started

// Pause receiving new messages if bootstrap serving started
return bootstrap_visitor.processed ? process_result::pause : process_result::progress;
}

debug_assert (false);
return true; // Continue receiving new messages
return process_result::abort;
}

void nano::transport::tcp_server::queue_realtime (std::unique_ptr<nano::message> message)
Expand All @@ -519,87 +536,75 @@ void nano::transport::tcp_server::queue_realtime (std::unique_ptr<nano::message>
node->network.tcp_channels.message_manager.put_message (nano::tcp_message_item{ std::move (message), remote_endpoint, remote_node_id, socket });
}

/*
* Handshake
*/

nano::transport::tcp_server::handshake_message_visitor::handshake_message_visitor (std::shared_ptr<tcp_server> server) :
server{ std::move (server) }
auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake const & message) -> handshake_status
{
}

void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (nano::node_id_handshake const & message)
{
auto node = server->node.lock ();
auto node = this->node.lock ();
if (!node)
{
return;
return handshake_status::abort;
}

if (node->flags.disable_tcp_realtime)
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", fmt::streamed (server->remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", fmt::streamed (remote_endpoint));

result = status::abort;
return; // Abort
return handshake_status::abort;
}
if (!message.query && !message.response)
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", fmt::streamed (server->remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", fmt::streamed (remote_endpoint));

result = status::abort;
return; // Abort
return handshake_status::abort;
}
if (message.query && server->handshake_received) // Second handshake message should be a response only
if (message.query && handshake_received) // Second handshake message should be a response only
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", fmt::streamed (server->remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", fmt::streamed (remote_endpoint));

result = status::abort;
return; // Abort
return handshake_status::abort;
}

node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in);

server->handshake_received = true;
handshake_received = true;

node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", fmt::streamed (server->remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", fmt::streamed (remote_endpoint));

if (message.query)
{
// Sends response + our own query
server->send_handshake_response (*message.query, message.is_v2 ());
result = status::progress; // Continue handshake
send_handshake_response (*message.query, message.is_v2 ());
// Fall through and continue handshake
}
if (message.response)
{
if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (server->remote_endpoint)))
if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (remote_endpoint)))
{
bool success = server->to_realtime_connection (message.response->node_id);
bool success = to_realtime_connection (message.response->node_id);
if (success)
{
result = status::realtime; // Switched to realtime
return handshake_status::realtime; // Switched to realtime
}
else
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", fmt::streamed (server->remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", fmt::streamed (remote_endpoint));

result = status::abort;
return;
return handshake_status::abort;
}
}
else
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response_invalid);
node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", fmt::streamed (server->remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", fmt::streamed (remote_endpoint));

result = status::abort;
return;
return handshake_status::abort;
}
}

return handshake_status::progress;
}

void nano::transport::tcp_server::initiate_handshake ()
Expand All @@ -613,7 +618,7 @@ void nano::transport::tcp_server::initiate_handshake ()
auto query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint));
nano::node_id_handshake message{ node->network_params.network, query };

node->logger.debug (nano::log::type::tcp_server, "Initiating handshake query ({})", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Initiating handshake query ({})", fmt::streamed (remote_endpoint));

auto shared_const_buffer = message.to_shared_const_buffer ();
socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) {
Expand Down Expand Up @@ -650,7 +655,7 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha
auto own_query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint));
nano::node_id_handshake handshake_response{ node->network_params.network, own_query, response };

node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", fmt::streamed (remote_endpoint));

auto shared_const_buffer = handshake_response.to_shared_const_buffer ();
socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) {
Expand All @@ -675,35 +680,39 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha
});
}

/*
* handshake_message_visitor
*/

void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (const nano::node_id_handshake & message)
{
result = server.process_handshake (message);
}

void nano::transport::tcp_server::handshake_message_visitor::bulk_pull (const nano::bulk_pull & message)
{
result = status::bootstrap;
result = handshake_status::bootstrap;
}

void nano::transport::tcp_server::handshake_message_visitor::bulk_pull_account (const nano::bulk_pull_account & message)
{
result = status::bootstrap;
result = handshake_status::bootstrap;
}

void nano::transport::tcp_server::handshake_message_visitor::bulk_push (const nano::bulk_push & message)
{
result = status::bootstrap;
result = handshake_status::bootstrap;
}

void nano::transport::tcp_server::handshake_message_visitor::frontier_req (const nano::frontier_req & message)
{
result = status::bootstrap;
result = handshake_status::bootstrap;
}

/*
* Realtime
* realtime_message_visitor
*/

nano::transport::tcp_server::realtime_message_visitor::realtime_message_visitor (nano::transport::tcp_server & server_a) :
server{ server_a }
{
}

void nano::transport::tcp_server::realtime_message_visitor::keepalive (const nano::keepalive & message)
{
process = true;
Expand Down Expand Up @@ -765,7 +774,7 @@ void nano::transport::tcp_server::realtime_message_visitor::asc_pull_ack (const
}

/*
* Bootstrap
* bootstrap_message_visitor
*/

nano::transport::tcp_server::bootstrap_message_visitor::bootstrap_message_visitor (std::shared_ptr<tcp_server> server) :
Expand Down Expand Up @@ -850,6 +859,10 @@ void nano::transport::tcp_server::bootstrap_message_visitor::frontier_req (const
processed = true;
}

/*
*
*/

// TODO: We could periodically call this (from a dedicated timeout thread for eg.) but socket already handles timeouts,
// and since we only ever store tcp_server as weak_ptr, socket timeout will automatically trigger tcp_server cleanup
void nano::transport::tcp_server::timeout ()
Expand Down
Loading

0 comments on commit 3760ca0

Please sign in to comment.