diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 04e5f489f5..06acd655ca 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -23,43 +23,14 @@ nano::network::network (nano::node & node_a, uint16_t port_a) : tcp_channels (node_a, [this] (nano::message const & message, std::shared_ptr const & channel) { inbound (message, channel); }), - port (port_a), disconnect_observer ([] () {}) + port (port_a) { - for (std::size_t i = 0; i < node.config.network_threads && !node.flags.disable_tcp_realtime; ++i) - { - packet_processing_threads.emplace_back (nano::thread_attributes::get_default (), [this, i] () { - nano::thread_role::set (nano::thread_role::name::packet_processing); - try - { - tcp_channels.process_messages (); - } - catch (boost::system::error_code & ec) - { - node.logger.critical (nano::log::type::network, "Error: {}", ec.message ()); - release_assert (false); - } - catch (std::error_code & ec) - { - node.logger.critical (nano::log::type::network, "Error: {}", ec.message ()); - release_assert (false); - } - catch (std::runtime_error & err) - { - node.logger.critical (nano::log::type::network, "Error: {}", err.what ()); - release_assert (false); - } - catch (...) - { - node.logger.critical (nano::log::type::network, "Unknown error"); - release_assert (false); - } - }); - } } nano::network::~network () { - stop (); + // All threads must be stopped before this destructor + debug_assert (processing_threads.empty ()); } void nano::network::start () @@ -68,26 +39,67 @@ void nano::network::start () { ongoing_cleanup (); } + ongoing_syn_cookie_cleanup (); + ongoing_keepalive (); + if (!node.flags.disable_tcp_realtime) { tcp_channels.start (); + + for (std::size_t i = 0; i < node.config.network_threads; ++i) + { + processing_threads.emplace_back (nano::thread_attributes::get_default (), [this] () { + nano::thread_role::set (nano::thread_role::name::packet_processing); + run_processing (); + }); + } } - ongoing_keepalive (); } void nano::network::stop () { - if (!stopped.exchange (true)) + stopped = true; + + tcp_channels.stop (); + resolver.cancel (); + tcp_message_manager.stop (); + + for (auto & thread : processing_threads) { - tcp_channels.stop (); - resolver.cancel (); - tcp_message_manager.stop (); - port = 0; - for (auto & thread : packet_processing_threads) - { - thread.join (); - } + thread.join (); + } + processing_threads.clear (); + + port = 0; +} + +void nano::network::run_processing () +{ + try + { + // TODO: Move responsibility of packet queuing and processing to the message_processor class + tcp_channels.process_messages (); + } + catch (boost::system::error_code & ec) + { + node.logger.critical (nano::log::type::network, "Error: {}", ec.message ()); + release_assert (false); + } + catch (std::error_code & ec) + { + node.logger.critical (nano::log::type::network, "Error: {}", ec.message ()); + release_assert (false); + } + catch (std::runtime_error & err) + { + node.logger.critical (nano::log::type::network, "Error: {}", err.what ()); + release_assert (false); + } + catch (...) + { + node.logger.critical (nano::log::type::network, "Unknown error"); + release_assert (false); } } diff --git a/nano/node/network.hpp b/nano/node/network.hpp index f49b5f4986..1791c15ce3 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -131,6 +131,7 @@ class network final nano::node_id_handshake::response_payload prepare_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) const; private: + void run_processing (); void process_message (nano::message const &, std::shared_ptr const &); private: // Dependencies @@ -140,18 +141,20 @@ class network final nano::networks const id; nano::syn_cookies syn_cookies; boost::asio::ip::udp::resolver resolver; - std::vector packet_processing_threads; nano::peer_exclusion excluded_peers; nano::tcp_message_manager tcp_message_manager; nano::network_filter publish_filter; nano::transport::tcp_channels tcp_channels; std::atomic port{ 0 }; - std::function disconnect_observer; + +public: // Callbacks + std::function disconnect_observer{ [] () {} }; // Called when a new channel is observed - std::function)> channel_observer; + std::function)> channel_observer{ [] (auto) {} }; private: std::atomic stopped{ false }; + std::vector processing_threads; public: static unsigned const broadcast_interval_ms = 10;