From 18798f95232b10884a80e2dfe58c783ccac74ea2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 18 Mar 2024 20:16:14 +0100 Subject: [PATCH] Rework tcp_server handshake visitor --- nano/lib/stats_enums.hpp | 9 ++ nano/node/transport/tcp_server.cpp | 194 +++++++++++++++++++++-------- nano/node/transport/tcp_server.hpp | 14 ++- 3 files changed, 160 insertions(+), 57 deletions(-) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index de7b6d020c..b1baaa9631 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -235,6 +235,15 @@ enum class detail : uint8_t tcp_read_error, tcp_write_error, + // tcp_server + handshake, + handshake_abort, + handshake_error, + handshake_network_error, + handshake_initiate, + handshake_response, + handshake_response_invalid, + // ipc invocations, diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index bea5f4f9ba..87d45d28d3 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -301,7 +301,7 @@ nano::transport::tcp_server::~tcp_server () return; } - node->logger.debug (nano::log::type::tcp_server, "Exiting TCP server ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Exiting TCP server ({})", fmt::streamed (remote_endpoint)); if (socket->type () == nano::transport::socket::type_t::bootstrap) { @@ -341,7 +341,7 @@ void nano::transport::tcp_server::start () return; } - node->logger.debug (nano::log::type::tcp_server, "Starting TCP server ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Starting TCP server ({})", fmt::streamed (remote_endpoint)); receive_message (); } @@ -374,7 +374,7 @@ void nano::transport::tcp_server::receive_message () node->logger.debug (nano::log::type::tcp_server, "Error reading message: {}, status: {} ({})", ec.message (), to_string (this_l->message_deserializer->status), - nano::util::to_str (this_l->remote_endpoint)); + fmt::streamed (this_l->remote_endpoint)); this_l->stop (); } @@ -412,7 +412,7 @@ void nano::transport::tcp_server::received_message (std::unique_ptrlogger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})", to_string (message_deserializer->status), - nano::util::to_str (remote_endpoint)); + fmt::streamed (remote_endpoint)); } } @@ -420,6 +420,10 @@ void nano::transport::tcp_server::received_message (std::unique_ptr message) @@ -429,6 +433,7 @@ bool nano::transport::tcp_server::process_message (std::unique_ptrstats.inc (nano::stat::type::tcp_server, to_stat_detail (message->header.type), nano::stat::dir::in); debug_assert (is_undefined_connection () || is_realtime_connection () || is_bootstrap_connection ()); @@ -448,27 +453,39 @@ bool nano::transport::tcp_server::process_message (std::unique_ptrvisit (handshake_visitor); - if (handshake_visitor.process) - { - queue_realtime (std::move (message)); - return true; - } - else if (handshake_visitor.bootstrap) + + switch (handshake_visitor.result) { - if (!to_bootstrap_connection ()) + case handshake_message_visitor::status::abort: + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_abort); + node->logger.debug (nano::log::type::tcp_server, "Aborting handshake: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint)); + + return false; // Stop receiving new messages + } + case handshake_message_visitor::status::progress: { - stop (); - return false; + return true; // Continue handshake } - } - else - { - // Neither handshake nor bootstrap received when in handshake mode - node->logger.debug (nano::log::type::tcp_server, "Neither handshake nor bootstrap received when in handshake mode: {} ({})", - nano::to_string (message->header.type), - nano::util::to_str (remote_endpoint)); + case handshake_message_visitor::status::realtime: + { + queue_realtime (std::move (message)); + return true; // Continue receiving new messages + } + case handshake_message_visitor::status::bootstrap: + { + if (!to_bootstrap_connection ()) + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Error switching to bootstrap mode: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint)); - return true; + return false; // Switch failed, stop receiving new messages + } + else + { + // Fall through to process the bootstrap message + } + } } } else if (is_realtime_connection ()) @@ -481,7 +498,7 @@ bool nano::transport::tcp_server::process_message (std::unique_ptrflags.disable_tcp_realtime) { - node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", nano::util::to_str (server->remote_endpoint)); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", fmt::streamed (server->remote_endpoint)); - // Stop invalid handshake - server->stop (); - return; + result = status::abort; + return; // Abort } + if (!message.query && !message.response) + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", fmt::streamed (server->remote_endpoint)); - if (message.query && server->handshake_query_received) + result = status::abort; + return; // Abort + } + if (message.query && server->handshake_received) // Second handshake message should be a response only { - node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", nano::util::to_str (server->remote_endpoint)); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", fmt::streamed (server->remote_endpoint)); - // Stop invalid handshake - server->stop (); - return; + result = status::abort; + return; // Abort } - server->handshake_query_received = true; + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in); + + server->handshake_received = true; - node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", nano::util::to_str (server->remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", fmt::streamed (server->remote_endpoint)); if (message.query) { + // Sends response + our own query server->send_handshake_response (*message.query, message.is_v2 ()); + result = status::progress; // Continue handshake } if (message.response) { if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (server->remote_endpoint))) { - server->to_realtime_connection (message.response->node_id); + bool success = server->to_realtime_connection (message.response->node_id); + if (success) + { + result = status::realtime; // Switched to realtime + } + else + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", fmt::streamed (server->remote_endpoint)); + + result = status::abort; + return; + } } else { - // Stop invalid handshake - server->stop (); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response_invalid); + node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", fmt::streamed (server->remote_endpoint)); + + result = status::abort; return; } } +} + +void nano::transport::tcp_server::initiate_handshake () +{ + auto node = this->node.lock (); + if (!node) + { + return; + } - process = true; + auto query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint)); + nano::node_id_handshake message{ node->network_params.network, query }; + + node->logger.debug (nano::log::type::tcp_server, "Initiating handshake query ({})", nano::util::to_str (remote_endpoint)); + + auto shared_const_buffer = message.to_shared_const_buffer (); + socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) { + auto node = this_l->node.lock (); + if (!node) + { + return; + } + if (ec) + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error); + node->logger.debug (nano::log::type::tcp_server, "Error sending handshake query: {} ({})", ec.message (), fmt::streamed (this_l->remote_endpoint)); + + // Stop invalid handshake + this_l->stop (); + } + else + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_initiate, nano::stat::dir::out); + } + }); } void nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) @@ -568,11 +645,13 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha { return; } + auto response = node->network.prepare_handshake_response (query, v2); auto own_query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint)); nano::node_id_handshake handshake_response{ node->network_params.network, own_query, response }; - // TODO: Use channel + node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", nano::util::to_str (remote_endpoint)); + auto shared_const_buffer = handshake_response.to_shared_const_buffer (); socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) { auto node = this_l->node.lock (); @@ -582,36 +661,38 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha } if (ec) { - node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), nano::util::to_str (this_l->remote_endpoint)); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error); + node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), fmt::streamed (this_l->remote_endpoint)); // Stop invalid handshake this_l->stop (); } else { - node->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response, nano::stat::dir::out); } }); } void nano::transport::tcp_server::handshake_message_visitor::bulk_pull (const nano::bulk_pull & message) { - bootstrap = true; + result = status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::bulk_pull_account (const nano::bulk_pull_account & message) { - bootstrap = true; + result = status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::bulk_push (const nano::bulk_push & message) { - bootstrap = true; + result = status::bootstrap; } void nano::transport::tcp_server::handshake_message_visitor::frontier_req (const nano::frontier_req & message) { - bootstrap = true; + result = status::bootstrap; } /* @@ -780,7 +861,7 @@ void nano::transport::tcp_server::timeout () } if (socket->has_timed_out ()) { - node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", fmt::streamed (remote_endpoint)); { nano::lock_guard lock{ node->tcp_listener->mutex }; @@ -834,7 +915,7 @@ bool nano::transport::tcp_server::to_bootstrap_connection () ++node->tcp_listener->bootstrap_count; socket->type_set (nano::transport::socket::type_t::bootstrap); - node->logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", nano::util::to_str (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", fmt::streamed (remote_endpoint)); return true; } @@ -846,17 +927,22 @@ bool nano::transport::tcp_server::to_realtime_connection (nano::account const & { return false; } - if (socket->type () == nano::transport::socket::type_t::undefined && !node->flags.disable_tcp_realtime) + if (node->flags.disable_tcp_realtime) { - remote_node_id = node_id; - ++node->tcp_listener->realtime_count; - socket->type_set (nano::transport::socket::type_t::realtime); + return false; + } + if (socket->type () != nano::transport::socket::type_t::undefined) + { + return false; + } - node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", nano::util::to_str (remote_endpoint)); + remote_node_id = node_id; + ++node->tcp_listener->realtime_count; + socket->type_set (nano::transport::socket::type_t::realtime); - return true; - } - return false; + node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", fmt::streamed (remote_endpoint)); + + return true; } bool nano::transport::tcp_server::is_undefined_connection () const diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 0760241164..4d3d6f5294 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -61,6 +61,7 @@ class tcp_server final : public std::enable_shared_from_this void start (); void stop (); + void initiate_handshake (); void timeout (); void set_last_keepalive (nano::keepalive const & message); std::optional pop_last_keepalive (); @@ -69,7 +70,7 @@ class tcp_server final : public std::enable_shared_from_this std::weak_ptr const node; nano::mutex mutex; std::atomic stopped{ false }; - std::atomic handshake_query_received{ false }; + std::atomic handshake_received{ false }; // Remote enpoint used to remove response channel even after socket closing nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 }; nano::account remote_node_id{}; @@ -99,8 +100,15 @@ class tcp_server final : public std::enable_shared_from_this class handshake_message_visitor : public nano::message_visitor { public: - bool process{ false }; - bool bootstrap{ false }; + enum class status + { + abort, + progress, + realtime, + bootstrap, + }; + + status result{ status::abort }; explicit handshake_message_visitor (std::shared_ptr);