diff --git a/nano/lib/logging.hpp b/nano/lib/logging.hpp index 14e6e2fc3a..ae9563300a 100644 --- a/nano/lib/logging.hpp +++ b/nano/lib/logging.hpp @@ -32,12 +32,58 @@ using logger_id = std::pair; std::string to_string (logger_id); logger_id parse_logger_id (std::string const &); +} +// Time helpers +namespace nano::log +{ template auto microseconds (std::chrono::time_point time) { return std::chrono::duration_cast (time.time_since_epoch ()).count (); } + +template +auto microseconds (Duration duration) +{ + return std::chrono::duration_cast (duration).count (); +} + +template +auto milliseconds (std::chrono::time_point time) +{ + return std::chrono::duration_cast (time.time_since_epoch ()).count (); +} + +template +auto milliseconds (Duration duration) +{ + return std::chrono::duration_cast (duration).count (); +} + +template +auto seconds (std::chrono::time_point time) +{ + return std::chrono::duration_cast (time.time_since_epoch ()).count (); +} + +template +auto seconds (Duration duration) +{ + return std::chrono::duration_cast (duration).count (); +} + +template +auto milliseconds_delta (std::chrono::time_point time, std::chrono::time_point now = Clock::now ()) +{ + return std::chrono::duration_cast (now - time).count (); +} + +template +auto seconds_delta (std::chrono::time_point time, std::chrono::time_point now = Clock::now ()) +{ + return std::chrono::duration_cast (now - time).count (); +} } namespace nano diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index eddf9b120a..822f73621c 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -72,6 +72,7 @@ enum class type election_scheduler, vote_generator, rep_tiers, + syn_cookies, // bootstrap bulk_pull_client, diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 6d1ac20b29..68679b2c53 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -17,6 +17,7 @@ enum class type : uint8_t ledger, rollback, bootstrap, + network, tcp_server, vote, election, @@ -52,6 +53,7 @@ enum class type : uint8_t rep_crawler, local_block_broadcaster, rep_tiers, + syn_cookies, bootstrap_ascending, bootstrap_ascending_accounts, @@ -67,6 +69,8 @@ enum class detail : uint8_t // common ok, loop, + loop_cleanup, + loop_keepalive, total, process, processed, diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 944ac0e1bd..6c66748d6b 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -109,6 +109,12 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::rep_tiers: thread_role_name_string = "Rep tiers"; break; + case nano::thread_role::name::network_cleanup: + thread_role_name_string = "Net cleanup"; + break; + case nano::thread_role::name::network_keepalive: + thread_role_name_string = "Net keepalive"; + break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 724efad5c8..8fd4175ae9 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -45,6 +45,8 @@ enum class name rep_crawler, local_block_broadcasting, rep_tiers, + network_cleanup, + network_keepalive, }; /* diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 80660cad00..5287391336 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -9,89 +9,164 @@ #include +using namespace std::chrono_literals; + /* * network */ -nano::network::network (nano::node & node_a, uint16_t port_a) : - id (nano::network_constants::active_network), - syn_cookies (node_a.network_params.network.max_peers_per_ip), - inbound{ [this] (nano::message const & message, std::shared_ptr const & channel) { - debug_assert (message.header.network == node.network_params.network.current_network); - debug_assert (message.header.version_using >= node.network_params.network.protocol_version_min); - process_message (message, channel); - } }, - resolver (node_a.io_ctx), - tcp_message_manager (node_a.config.tcp_incoming_connections_max), - node (node_a), - publish_filter (256 * 1024), - tcp_channels (node_a, inbound), - port (port_a), - disconnect_observer ([] () {}) -{ - for (std::size_t i = 0; i < node.config.network_threads && !node.flags.disable_tcp_realtime; ++i) - { - packet_processing_threads.emplace_back (nano::thread_attributes::get_default (), [this, i] () { - nano::thread_role::set (nano::thread_role::name::packet_processing); - try - { - tcp_channels.process_messages (); - } - catch (boost::system::error_code & ec) - { - node.logger.critical (nano::log::type::network, "Error: {}", ec.message ()); - release_assert (false); - } - catch (std::error_code & ec) - { - node.logger.critical (nano::log::type::network, "Error: {}", ec.message ()); - release_assert (false); - } - catch (std::runtime_error & err) - { - node.logger.critical (nano::log::type::network, "Error: {}", err.what ()); - release_assert (false); - } - catch (...) - { - node.logger.critical (nano::log::type::network, "Unknown error"); - release_assert (false); - } - }); - } +nano::network::network (nano::node & node, uint16_t port) : + node{ node }, + id{ nano::network_constants::active_network }, + syn_cookies{ node.network_params.network.max_peers_per_ip, node.logger }, + resolver{ node.io_ctx }, + publish_filter{ 256 * 1024 }, + tcp_channels{ node, [this] (nano::message const & message, std::shared_ptr const & channel) { + inbound (message, channel); + } }, + port{ port } +{ } nano::network::~network () { - stop (); + // All threads must be stopped before this destructor + debug_assert (processing_threads.empty ()); + debug_assert (!cleanup_thread.joinable ()); + debug_assert (!keepalive_thread.joinable ()); } void nano::network::start () { - if (!node.flags.disable_connection_cleanup) - { - ongoing_cleanup (); - } - ongoing_syn_cookie_cleanup (); + cleanup_thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::network_cleanup); + run_cleanup (); + }); + + keepalive_thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::network_keepalive); + run_keepalive (); + }); + if (!node.flags.disable_tcp_realtime) { tcp_channels.start (); + + for (std::size_t i = 0; i < node.config.network_threads; ++i) + { + processing_threads.emplace_back (nano::thread_attributes::get_default (), [this] () { + nano::thread_role::set (nano::thread_role::name::packet_processing); + run_processing (); + }); + } } - ongoing_keepalive (); } void nano::network::stop () { - if (!stopped.exchange (true)) { - tcp_channels.stop (); - resolver.cancel (); - tcp_message_manager.stop (); - port = 0; - for (auto & thread : packet_processing_threads) + nano::lock_guard lock{ mutex }; + stopped = true; + } + condition.notify_all (); + + tcp_channels.stop (); + resolver.cancel (); + + for (auto & thread : processing_threads) + { + thread.join (); + } + processing_threads.clear (); + + if (keepalive_thread.joinable ()) + { + keepalive_thread.join (); + } + if (cleanup_thread.joinable ()) + { + cleanup_thread.join (); + } + + port = 0; +} + +void nano::network::run_processing () +{ + try + { + // TODO: Move responsibility of packet queuing and processing to the message_processor class + tcp_channels.process_messages (); + } + catch (boost::system::error_code & ec) + { + node.logger.critical (nano::log::type::network, "Error: {}", ec.message ()); + release_assert (false); + } + catch (std::error_code & ec) + { + node.logger.critical (nano::log::type::network, "Error: {}", ec.message ()); + release_assert (false); + } + catch (std::runtime_error & err) + { + node.logger.critical (nano::log::type::network, "Error: {}", err.what ()); + release_assert (false); + } + catch (...) + { + node.logger.critical (nano::log::type::network, "Unknown error"); + release_assert (false); + } +} + +void nano::network::run_cleanup () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + condition.wait_for (lock, node.network_params.network.is_dev_network () ? 1s : 5s); + lock.unlock (); + + if (stopped) + { + return; + } + + node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_cleanup); + + if (!node.flags.disable_connection_cleanup) + { + auto const cutoff = std::chrono::steady_clock::now () - node.network_params.network.cleanup_cutoff (); + cleanup (cutoff); + } + + auto const syn_cookie_cutoff = std::chrono::steady_clock::now () - node.network_params.network.syn_cookie_cutoff; + syn_cookies.purge (syn_cookie_cutoff); + + lock.lock (); + } +} + +void nano::network::run_keepalive () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + condition.wait_for (lock, node.network_params.network.keepalive_period); + lock.unlock (); + + if (stopped) { - thread.join (); + return; } + + node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_keepalive); + + flood_keepalive (0.75f); + flood_keepalive_self (0.25f); + + lock.lock (); } } @@ -109,34 +184,6 @@ void nano::network::send_keepalive_self (std::shared_ptrsend (message); } -void nano::network::send_node_id_handshake (std::shared_ptr const & channel_a, std::optional const & cookie, std::optional const & respond_to) -{ - std::optional response; - if (respond_to) - { - nano::node_id_handshake::response_payload pld{ node.node_id.pub, nano::sign_message (node.node_id.prv, node.node_id.pub, *respond_to) }; - debug_assert (!nano::validate_message (pld.node_id, *respond_to, pld.signature)); - response = pld; - } - - std::optional query; - if (cookie) - { - nano::node_id_handshake::query_payload pld{ *cookie }; - query = pld; - } - - nano::node_id_handshake message{ node.network_params.network, query, response }; - - node.logger.debug (nano::log::type::network, "Node ID handshake sent to: {} (query: {}, respond to: {}, signature: {})", - nano::util::to_str (channel_a->get_endpoint ()), - (query ? query->cookie.to_string () : ""), - (respond_to ? respond_to->to_string () : ""), - (response ? response->signature.to_string () : "")); - - channel_a->send (message); -} - void nano::network::flood_message (nano::message & message_a, nano::transport::buffer_drop_policy const drop_policy_a, float const scale_a) { for (auto & i : list (fanout (scale_a))) @@ -220,14 +267,6 @@ void nano::network::flood_block_many (std::deque> b } } -void nano::network::send_confirm_req (std::shared_ptr const & channel_a, std::pair const & hash_root_a) -{ - auto & [hash, root] = hash_root_a; - // Confirmation request with hash + root - nano::confirm_req req (node.network_params.network, hash, root); - channel_a->send (req); -} - namespace { class network_message_visitor : public nano::message_visitor @@ -354,6 +393,13 @@ void nano::network::process_message (nano::message const & message, std::shared_ message.visit (visitor); } +void nano::network::inbound (const nano::message & message, const std::shared_ptr & channel) +{ + debug_assert (message.header.network == node.network_params.network.current_network); + debug_assert (message.header.version_using >= node.network_params.network.protocol_version_min); + process_message (message, channel); +} + // Send keepalives to all the peers we've been notified of void nano::network::merge_peers (std::array const & peers_a) { @@ -506,52 +552,18 @@ nano::endpoint nano::network::endpoint () const return nano::endpoint (boost::asio::ip::address_v6::loopback (), port); } -void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutoff_a) +void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutoff) { - tcp_channels.purge (cutoff_a); + node.logger.debug (nano::log::type::network, "Performing cleanup, cutoff: {}s", nano::log::seconds_delta (cutoff)); + + tcp_channels.purge (cutoff); + if (node.network.empty ()) { disconnect_observer (); } } -void nano::network::ongoing_cleanup () -{ - cleanup (std::chrono::steady_clock::now () - node.network_params.network.cleanup_cutoff ()); - std::weak_ptr node_w (node.shared ()); - node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 5), [node_w] () { - if (auto node_l = node_w.lock ()) - { - node_l->network.ongoing_cleanup (); - } - }); -} - -void nano::network::ongoing_syn_cookie_cleanup () -{ - syn_cookies.purge (std::chrono::steady_clock::now () - nano::transport::syn_cookie_cutoff); - std::weak_ptr node_w (node.shared ()); - node.workers.add_timed_task (std::chrono::steady_clock::now () + (nano::transport::syn_cookie_cutoff * 2), [node_w] () { - if (auto node_l = node_w.lock ()) - { - node_l->network.ongoing_syn_cookie_cleanup (); - } - }); -} - -void nano::network::ongoing_keepalive () -{ - flood_keepalive (0.75f); - flood_keepalive_self (0.25f); - std::weak_ptr node_w (node.shared ()); - node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.keepalive_period, [node_w] () { - if (auto node_l = node_w.lock ()) - { - node_l->network.ongoing_keepalive (); - } - }); -} - std::size_t nano::network::size () const { return tcp_channels.size (); @@ -702,18 +714,19 @@ void nano::tcp_message_manager::stop () * syn_cookies */ -nano::syn_cookies::syn_cookies (std::size_t max_cookies_per_ip_a) : - max_cookies_per_ip (max_cookies_per_ip_a) +nano::syn_cookies::syn_cookies (std::size_t max_cookies_per_ip_a, nano::logger & logger_a) : + max_cookies_per_ip (max_cookies_per_ip_a), + logger (logger_a) { } -boost::optional nano::syn_cookies::assign (nano::endpoint const & endpoint_a) +std::optional nano::syn_cookies::assign (nano::endpoint const & endpoint_a) { auto ip_addr (endpoint_a.address ()); debug_assert (ip_addr.is_v6 ()); nano::lock_guard lock{ syn_cookie_mutex }; unsigned & ip_cookies = cookies_per_ip[ip_addr]; - boost::optional result; + std::optional result; if (ip_cookies < max_cookies_per_ip) { if (cookies.find (endpoint_a) == cookies.end ()) @@ -755,6 +768,8 @@ bool nano::syn_cookies::validate (nano::endpoint const & endpoint_a, nano::accou void nano::syn_cookies::purge (std::chrono::steady_clock::time_point const & cutoff_a) { + logger.debug (nano::log::type::syn_cookies, "Purging syn cookies, cutoff: {}s", nano::log::seconds_delta (cutoff_a)); + nano::lock_guard lock{ syn_cookie_mutex }; auto it (cookies.begin ()); while (it != cookies.end ()) diff --git a/nano/node/network.hpp b/nano/node/network.hpp index cd5b90e047..d51aedaa48 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -16,39 +16,18 @@ namespace nano { class node; -class tcp_message_manager final -{ -public: - tcp_message_manager (unsigned incoming_connections_max_a); - void put_message (nano::tcp_message_item const & item_a); - nano::tcp_message_item get_message (); - // Stop container and notify waiting threads - void stop (); - -private: - nano::mutex mutex; - nano::condition_variable producer_condition; - nano::condition_variable consumer_condition; - std::deque entries; - unsigned max_entries; - static unsigned const max_entries_per_connection = 16; - bool stopped{ false }; - - friend class network_tcp_message_manager_Test; -}; - /** * Node ID cookies for node ID handshakes */ class syn_cookies final { public: - explicit syn_cookies (std::size_t); + syn_cookies (std::size_t max_peers_per_ip, nano::logger &); void purge (std::chrono::steady_clock::time_point const &); // Returns boost::none if the IP is rate capped on syn cookie requests, // or if the endpoint already has a syn cookie query - boost::optional assign (nano::endpoint const &); + std::optional assign (nano::endpoint const &); // Returns false if valid, true if invalid (true on error convention) // Also removes the syn cookie from the store if valid bool validate (nano::endpoint const &, nano::account const &, nano::signature const &); @@ -58,6 +37,9 @@ class syn_cookies final std::unique_ptr collect_container_info (std::string const &); std::size_t cookies_size (); +private: // Dependencies + nano::logger & logger; + private: class syn_cookie_info final { @@ -74,12 +56,12 @@ class syn_cookies final class network final { public: - network (nano::node &, uint16_t); + network (nano::node &, uint16_t port); ~network (); - nano::networks id; void start (); void stop (); + void flood_message (nano::message &, nano::transport::buffer_drop_policy const = nano::transport::buffer_drop_policy::limiter, float const = 1.0f); void flood_keepalive (float const scale_a = 1.0f); void flood_keepalive_self (float const scale_a = 0.5f); @@ -94,8 +76,6 @@ class network final void merge_peer (nano::endpoint const &); void send_keepalive (std::shared_ptr const &); void send_keepalive_self (std::shared_ptr const &); - void send_node_id_handshake (std::shared_ptr const &, std::optional const & cookie, std::optional const & respond_to); - void send_confirm_req (std::shared_ptr const & channel_a, std::pair const & hash_root_a); std::shared_ptr find_node_id (nano::account const &); std::shared_ptr find_channel (nano::endpoint const &); bool not_a_peer (nano::endpoint const &, bool); @@ -112,41 +92,53 @@ class network final // Get the next peer for attempting a tcp bootstrap connection nano::tcp_endpoint bootstrap_peer (); nano::endpoint endpoint () const; - void cleanup (std::chrono::steady_clock::time_point const &); - void ongoing_cleanup (); - // Node ID cookies cleanup - nano::syn_cookies syn_cookies; - void ongoing_syn_cookie_cleanup (); - void ongoing_keepalive (); + void cleanup (std::chrono::steady_clock::time_point const & cutoff); std::size_t size () const; float size_sqrt () const; bool empty () const; void erase (nano::transport::channel const &); /** Disconnects and adds peer to exclusion list */ void exclude (std::shared_ptr const & channel); + void inbound (nano::message const &, std::shared_ptr const &); +public: // Handshake /** Verifies that handshake response matches our query. @returns true if OK */ bool verify_handshake_response (nano::node_id_handshake::response_payload const & response, nano::endpoint const & remote_endpoint); std::optional prepare_handshake_query (nano::endpoint const & remote_endpoint); nano::node_id_handshake::response_payload prepare_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) const; private: + void run_processing (); + void run_cleanup (); + void run_keepalive (); void process_message (nano::message const &, std::shared_ptr const &); +private: // Dependencies + nano::node & node; + public: - std::function const &)> inbound; + nano::networks const id; + nano::syn_cookies syn_cookies; boost::asio::ip::udp::resolver resolver; - std::vector packet_processing_threads; nano::peer_exclusion excluded_peers; - nano::tcp_message_manager tcp_message_manager; - nano::node & node; nano::network_filter publish_filter; nano::transport::tcp_channels tcp_channels; std::atomic port{ 0 }; - std::function disconnect_observer; + +public: // Callbacks + std::function disconnect_observer{ [] () {} }; // Called when a new channel is observed - std::function)> channel_observer; + std::function)> channel_observer{ [] (auto) {} }; + +private: std::atomic stopped{ false }; + mutable nano::mutex mutex; + nano::condition_variable condition; + std::vector processing_threads; // Using boost::thread to enable increased stack size + std::thread cleanup_thread; + std::thread keepalive_thread; + +public: static unsigned const broadcast_interval_ms = 10; static std::size_t const buffer_size = 512; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 8397fa70d3..b0de6bc931 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -174,8 +174,8 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons vote_uniquer{}, confirmation_height_processor (ledger, write_database_queue, config.conf_height_processor_batch_min_time, logger, node_initialized_latch, flags.confirmation_height_processor_mode), vote_cache{ config.vote_cache, stats }, - generator{ config, ledger, wallets, vote_processor, history, network, stats, logger, /* non-final */ false }, - final_generator{ config, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true }, + generator{ config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* non-final */ false }, + final_generator{ config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true }, active{ *this, confirmation_height_processor, block_processor }, scheduler_impl{ std::make_unique (*this) }, scheduler{ *scheduler_impl }, diff --git a/nano/node/transport/inproc.cpp b/nano/node/transport/inproc.cpp index b07bd0d35f..2dfb36d122 100644 --- a/nano/node/transport/inproc.cpp +++ b/nano/node/transport/inproc.cpp @@ -25,30 +25,6 @@ bool nano::transport::inproc::channel::operator== (nano::transport::channel cons return endpoint == other_a.get_endpoint (); } -/** - * This function is called for every message received by the inproc channel. - * Note that it is called from inside the context of nano::transport::inproc::channel::send_buffer - */ -class message_visitor_inbound : public nano::message_visitor -{ -public: - message_visitor_inbound (decltype (nano::network::inbound) & inbound, std::shared_ptr channel) : - inbound{ inbound }, - channel{ channel } - { - } - - decltype (nano::network::inbound) & inbound; - - // the channel to reply to, if a reply is generated - std::shared_ptr channel; - - void default_handler (nano::message const & message) override - { - inbound (message, channel); - } -}; - /** * Send the buffer to the peer and call the callback function when done. The call never fails. * Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background. @@ -78,11 +54,8 @@ void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer co // process message { - node.stats.inc (nano::stat::type::message, to_stat_detail (message_a->header.type), nano::stat::dir::in); - - // create an inbound message visitor class to handle incoming messages - message_visitor_inbound visitor{ destination.network.inbound, remote_channel }; - message_a->visit (visitor); + node.stats.inc (nano::stat::type::message, to_stat_detail (message_a->type ()), nano::stat::dir::in); + destination.network.inbound (*message_a, remote_channel); } }); diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index e6ddca6120..6dab450206 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -126,6 +126,7 @@ void nano::transport::channel_tcp::operator() (nano::object_stream & obs) const nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function const &)> sink) : node{ node }, + message_manager{ node.config.tcp_incoming_connections_max }, sink{ std::move (sink) } { } @@ -295,7 +296,7 @@ void nano::transport::tcp_channels::process_messages () { while (!stopped) { - auto item (node.network.tcp_message_manager.get_message ()); + auto item = message_manager.get_message (); if (item.message != nullptr) { process_message (*item.message, item.endpoint, item.node_id, item.socket); @@ -364,6 +365,9 @@ void nano::transport::tcp_channels::stop () { stopped = true; nano::unique_lock lock{ mutex }; + + message_manager.stop (); + // Close all TCP sockets for (auto const & channel : channels) { diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 0f65b3bd0e..fd588eb458 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -25,6 +25,28 @@ class tcp_message_item final nano::account node_id; std::shared_ptr socket; }; + +class tcp_message_manager final +{ +public: + tcp_message_manager (unsigned incoming_connections_max_a); + void put_message (nano::tcp_message_item const & item_a); + nano::tcp_message_item get_message (); + // Stop container and notify waiting threads + void stop (); + +private: + nano::mutex mutex; + nano::condition_variable producer_condition; + nano::condition_variable consumer_condition; + std::deque entries; + unsigned max_entries; + static unsigned const max_entries_per_connection = 16; + bool stopped{ false }; + + friend class network_tcp_message_manager_Test; +}; + namespace transport { class tcp_server; @@ -136,10 +158,14 @@ namespace transport // Connection start void start_tcp (nano::endpoint const &); void start_tcp_receive_node_id (std::shared_ptr const &, nano::endpoint const &, std::shared_ptr> const &); + + private: // Dependencies nano::node & node; + public: + nano::tcp_message_manager message_manager; + private: - std::function const &)> sink; class endpoint_tag { }; @@ -255,6 +281,9 @@ namespace transport mi::member>>> attempts; // clang-format on + + private: + std::function const &)> sink; std::atomic stopped{ false }; friend class network_peer_max_tcp_attempts_subnetwork_Test; diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index 7e37ef97bc..edd7f9663a 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -496,7 +496,7 @@ void nano::transport::tcp_server::queue_realtime (std::unique_ptr { return; } - node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::move (message), remote_endpoint, remote_node_id, socket }); + node->network.tcp_channels.message_manager.put_message (nano::tcp_message_item{ std::move (message), remote_endpoint, remote_node_id, socket }); } /* diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 359adb0e37..6e177a3f87 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -22,5 +22,4 @@ bool is_ipv4_or_v4_mapped_address (boost::asio::ip::address const &); // Unassigned, reserved, self bool reserved_address (nano::endpoint const &, bool = false); -static std::chrono::seconds constexpr syn_cookie_cutoff = std::chrono::seconds (5); } \ No newline at end of file diff --git a/nano/node/voting.cpp b/nano/node/voting.cpp index 9967f7edb5..13a66ea9e2 100644 --- a/nano/node/voting.cpp +++ b/nano/node/voting.cpp @@ -162,8 +162,9 @@ std::unique_ptr nano::collect_container_info (na return composite; } -nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a, nano::stats & stats_a, nano::logger & logger_a, bool is_final_a) : +nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a, nano::stats & stats_a, nano::logger & logger_a, bool is_final_a) : config (config_a), + node (node_a), ledger (ledger_a), wallets (wallets_a), vote_processor (vote_processor_a), @@ -394,7 +395,7 @@ void nano::vote_generator::broadcast_action (std::shared_ptr const & { network.flood_vote_pr (vote_a); network.flood_vote (vote_a, 2.0f); - vote_processor.vote (vote_a, std::make_shared (network.node, network.node)); + vote_processor.vote (vote_a, std::make_shared (node, node)); // TODO: Avoid creating a temporary channel each time } void nano::vote_generator::run () diff --git a/nano/node/voting.hpp b/nano/node/voting.hpp index 6a2b2c79b6..24c5813dd1 100644 --- a/nano/node/voting.hpp +++ b/nano/node/voting.hpp @@ -22,6 +22,7 @@ namespace mi = boost::multi_index; namespace nano { +class node; class ledger; class network; class node_config; @@ -122,7 +123,7 @@ class vote_generator final using queue_entry_t = std::pair; public: - vote_generator (nano::node_config const &, nano::ledger &, nano::wallets &, nano::vote_processor &, nano::local_vote_history &, nano::network &, nano::stats &, nano::logger &, bool is_final); + vote_generator (nano::node_config const &, nano::node &, nano::ledger &, nano::wallets &, nano::vote_processor &, nano::local_vote_history &, nano::network &, nano::stats &, nano::logger &, bool is_final); ~vote_generator (); /** Queue items for vote generation, or broadcast votes already in cache */ @@ -153,6 +154,7 @@ class vote_generator final private: // Dependencies nano::node_config const & config; + nano::node & node; nano::ledger & ledger; nano::wallets & wallets; nano::vote_processor & vote_processor;