From 3760ca04863c6a358a11619e5dcbdbf020205dd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 18 Mar 2024 22:44:06 +0100 Subject: [PATCH] Simplify message visitor --- nano/core_test/network.cpp | 2 +- nano/node/transport/tcp_server.cpp | 157 ++++++++++++++++------------- nano/node/transport/tcp_server.hpp | 43 +++++--- 3 files changed, 113 insertions(+), 89 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index c7152c6641..fe8bcfb6fb 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -722,7 +722,7 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake) ASSERT_FALSE (ec); }); }); - ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake) != 0); + ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake) != 0); { nano::lock_guard guard (node0->tcp_listener->mutex); ASSERT_EQ (node0->tcp_listener->connections.size (), 1); diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index 87d45d28d3..3dbc5673d3 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -392,10 +392,11 @@ void nano::transport::tcp_server::received_message (std::unique_ptrstatus != transport::parse_status::success); node->stats.inc (nano::stat::type::error, to_stat_detail (message_deserializer->status)); + + // Avoid too much noise about `duplicate_publish_message` errors if (message_deserializer->status == transport::parse_status::duplicate_publish_message) { node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message); } else { - // Avoid too much noise about `duplicate_publish_message` errors node->logger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})", to_string (message_deserializer->status), fmt::streamed (remote_endpoint)); } } - if (should_continue) - { - receive_message (); - } - else + switch (result) { - stop (); + case process_result::progress: + { + receive_message (); + } + break; + case process_result::abort: + { + stop (); + } + break; + case process_result::pause: + { + // Do nothing + } + break; } } -bool nano::transport::tcp_server::process_message (std::unique_ptr message) +auto nano::transport::tcp_server::process_message (std::unique_ptr message) -> process_result { auto node = this->node.lock (); if (!node) { - return false; + return process_result::abort; } - node->stats.inc (nano::stat::type::tcp_server, to_stat_detail (message->header.type), nano::stat::dir::in); + node->stats.inc (nano::stat::type::tcp_server, to_stat_detail (message->type ()), nano::stat::dir::in); debug_assert (is_undefined_connection () || is_realtime_connection () || is_bootstrap_connection ()); @@ -451,35 +463,35 @@ bool nano::transport::tcp_server::process_message (std::unique_ptrvisit (handshake_visitor); switch (handshake_visitor.result) { - case handshake_message_visitor::status::abort: + case handshake_status::abort: { node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_abort); node->logger.debug (nano::log::type::tcp_server, "Aborting handshake: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint)); - return false; // Stop receiving new messages + return process_result::abort; } - case handshake_message_visitor::status::progress: + case handshake_status::progress: { - return true; // Continue handshake + return process_result::progress; // Continue handshake } - case handshake_message_visitor::status::realtime: + case handshake_status::realtime: { queue_realtime (std::move (message)); - return true; // Continue receiving new messages + return process_result::progress; // Continue receiving new messages } - case handshake_message_visitor::status::bootstrap: + case handshake_status::bootstrap: { if (!to_bootstrap_connection ()) { node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); node->logger.debug (nano::log::type::tcp_server, "Error switching to bootstrap mode: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint)); - return false; // Switch failed, stop receiving new messages + return process_result::abort; // Switch failed, abort } else { @@ -492,21 +504,26 @@ bool nano::transport::tcp_server::process_message (std::unique_ptrvisit (realtime_visitor); + if (realtime_visitor.process) { queue_realtime (std::move (message)); } - return true; + + return process_result::progress; } // The server will switch to bootstrap mode immediately after processing the first bootstrap message, thus no `else if` if (is_bootstrap_connection ()) { bootstrap_message_visitor bootstrap_visitor{ shared_from_this () }; message->visit (bootstrap_visitor); - return !bootstrap_visitor.processed; // Stop receiving new messages if bootstrap serving started + + // Pause receiving new messages if bootstrap serving started + return bootstrap_visitor.processed ? process_result::pause : process_result::progress; } + debug_assert (false); - return true; // Continue receiving new messages + return process_result::abort; } void nano::transport::tcp_server::queue_realtime (std::unique_ptr message) @@ -519,87 +536,75 @@ void nano::transport::tcp_server::queue_realtime (std::unique_ptr node->network.tcp_channels.message_manager.put_message (nano::tcp_message_item{ std::move (message), remote_endpoint, remote_node_id, socket }); } -/* - * Handshake - */ - -nano::transport::tcp_server::handshake_message_visitor::handshake_message_visitor (std::shared_ptr server) : - server{ std::move (server) } +auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake const & message) -> handshake_status { -} - -void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (nano::node_id_handshake const & message) -{ - auto node = server->node.lock (); + auto node = this->node.lock (); if (!node) { - return; + return handshake_status::abort; } if (node->flags.disable_tcp_realtime) { node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); - node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", fmt::streamed (server->remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", fmt::streamed (remote_endpoint)); - result = status::abort; - return; // Abort + return handshake_status::abort; } if (!message.query && !message.response) { node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); - node->logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", fmt::streamed (server->remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", fmt::streamed (remote_endpoint)); - result = status::abort; - return; // Abort + return handshake_status::abort; } - if (message.query && server->handshake_received) // Second handshake message should be a response only + if (message.query && handshake_received) // Second handshake message should be a response only { node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); - node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", fmt::streamed (server->remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", fmt::streamed (remote_endpoint)); - result = status::abort; - return; // Abort + return handshake_status::abort; } node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in); - server->handshake_received = true; + handshake_received = true; - node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", fmt::streamed (server->remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", fmt::streamed (remote_endpoint)); if (message.query) { // Sends response + our own query - server->send_handshake_response (*message.query, message.is_v2 ()); - result = status::progress; // Continue handshake + send_handshake_response (*message.query, message.is_v2 ()); + // Fall through and continue handshake } if (message.response) { - if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (server->remote_endpoint))) + if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (remote_endpoint))) { - bool success = server->to_realtime_connection (message.response->node_id); + bool success = to_realtime_connection (message.response->node_id); if (success) { - result = status::realtime; // Switched to realtime + return handshake_status::realtime; // Switched to realtime } else { node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); - node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", fmt::streamed (server->remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", fmt::streamed (remote_endpoint)); - result = status::abort; - return; + return handshake_status::abort; } } else { node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response_invalid); - node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", fmt::streamed (server->remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", fmt::streamed (remote_endpoint)); - result = status::abort; - return; + return handshake_status::abort; } } + + return handshake_status::progress; } void nano::transport::tcp_server::initiate_handshake () @@ -613,7 +618,7 @@ void nano::transport::tcp_server::initiate_handshake () auto query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint)); nano::node_id_handshake message{ node->network_params.network, query }; - node->logger.debug (nano::log::type::tcp_server, "Initiating handshake query ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Initiating handshake query ({})", fmt::streamed (remote_endpoint)); auto shared_const_buffer = message.to_shared_const_buffer (); socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) { @@ -650,7 +655,7 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha auto own_query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint)); nano::node_id_handshake handshake_response{ node->network_params.network, own_query, response }; - node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", fmt::streamed (remote_endpoint)); auto shared_const_buffer = handshake_response.to_shared_const_buffer (); socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) { @@ -675,35 +680,39 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha }); } +/* + * handshake_message_visitor + */ + +void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (const nano::node_id_handshake & message) +{ + result = server.process_handshake (message); +} + void nano::transport::tcp_server::handshake_message_visitor::bulk_pull (const nano::bulk_pull & message) { - result = status::bootstrap; + result = handshake_status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::bulk_pull_account (const nano::bulk_pull_account & message) { - result = status::bootstrap; + result = handshake_status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::bulk_push (const nano::bulk_push & message) { - result = status::bootstrap; + result = handshake_status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::frontier_req (const nano::frontier_req & message) { - result = status::bootstrap; + result = handshake_status::bootstrap; } /* - * Realtime + * realtime_message_visitor */ -nano::transport::tcp_server::realtime_message_visitor::realtime_message_visitor (nano::transport::tcp_server & server_a) : - server{ server_a } -{ -} - void nano::transport::tcp_server::realtime_message_visitor::keepalive (const nano::keepalive & message) { process = true; @@ -765,7 +774,7 @@ void nano::transport::tcp_server::realtime_message_visitor::asc_pull_ack (const } /* - * Bootstrap + * bootstrap_message_visitor */ nano::transport::tcp_server::bootstrap_message_visitor::bootstrap_message_visitor (std::shared_ptr server) : @@ -850,6 +859,10 @@ void nano::transport::tcp_server::bootstrap_message_visitor::frontier_req (const processed = true; } +/* + * + */ + // TODO: We could periodically call this (from a dedicated timeout thread for eg.) but socket already handles timeouts, // and since we only ever store tcp_server as weak_ptr, socket timeout will automatically trigger tcp_server cleanup void nano::transport::tcp_server::timeout () diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 4d3d6f5294..67abf9745e 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -77,12 +77,16 @@ class tcp_server final : public std::enable_shared_from_this std::chrono::steady_clock::time_point last_telemetry_req{}; private: - void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2); + enum class process_result + { + abort, + progress, + pause, + }; void receive_message (); void received_message (std::unique_ptr message); - bool process_message (std::unique_ptr message); - + process_result process_message (std::unique_ptr message); void queue_realtime (std::unique_ptr message); bool to_bootstrap_connection (); @@ -91,26 +95,30 @@ class tcp_server final : public std::enable_shared_from_this bool is_bootstrap_connection () const; bool is_realtime_connection () const; + enum class handshake_status + { + abort, + progress, + realtime, + bootstrap, + }; + + handshake_status process_handshake (nano::node_id_handshake const & message); + void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2); + private: bool const allow_bootstrap; std::shared_ptr message_deserializer; std::optional last_keepalive; -private: +private: // Visitors class handshake_message_visitor : public nano::message_visitor { public: - enum class status - { - abort, - progress, - realtime, - bootstrap, - }; - - status result{ status::abort }; + handshake_status result{ handshake_status::abort }; - explicit handshake_message_visitor (std::shared_ptr); + explicit handshake_message_visitor (tcp_server & server) : + server{ server } {}; void node_id_handshake (nano::node_id_handshake const &) override; void bulk_pull (nano::bulk_pull const &) override; @@ -119,7 +127,7 @@ class tcp_server final : public std::enable_shared_from_this void frontier_req (nano::frontier_req const &) override; private: - std::shared_ptr server; + tcp_server & server; }; class realtime_message_visitor : public nano::message_visitor @@ -127,7 +135,8 @@ class tcp_server final : public std::enable_shared_from_this public: bool process{ false }; - explicit realtime_message_visitor (tcp_server &); + explicit realtime_message_visitor (tcp_server & server) : + server{ server } {}; void keepalive (nano::keepalive const &) override; void publish (nano::publish const &) override; @@ -158,5 +167,7 @@ class tcp_server final : public std::enable_shared_from_this private: std::shared_ptr server; }; + + friend class handshake_message_visitor; }; }