diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 89b2c5da0c..10b6c0d333 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -22,6 +22,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::io_daemon: thread_role_name_string = "I/O (daemon)"; break; + case nano::thread_role::name::io_ipc: + thread_role_name_string = "I/O (IPC)"; + break; case nano::thread_role::name::work: thread_role_name_string = "Work pool"; break; @@ -206,9 +209,12 @@ std::string nano::thread_role::get_string () void nano::thread_role::set (nano::thread_role::name role) { - auto thread_role_name_string (get_string (role)); - - nano::thread_role::set_os_name (thread_role_name_string); - + auto thread_role_name_string = get_string (role); + nano::thread_role::set_os_name (thread_role_name_string); // Implementation is platform specific current_thread_role = role; } + +bool nano::thread_role::is_network_io () +{ + return nano::thread_role::get () == nano::thread_role::name::io; +} \ No newline at end of file diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 15b4a511cb..c98248ceae 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -12,6 +12,7 @@ enum class name unknown, io, io_daemon, + io_ipc, work, message_processing, vote_processing, @@ -87,4 +88,9 @@ std::string get_string (); * Internal only, should not be called directly */ void set_os_name (std::string const &); + +/* + * Check if the current thread is a network IO thread + */ +bool is_network_io (); } diff --git a/nano/node/ipc/ipc_server.cpp b/nano/node/ipc/ipc_server.cpp index ce711cb392..0db3140e19 100644 --- a/nano/node/ipc/ipc_server.cpp +++ b/nano/node/ipc/ipc_server.cpp @@ -483,7 +483,7 @@ class socket_transport : public nano::ipc::transport acceptor->set_option (option_keepalive); accept (); - runner = std::make_unique (io_ctx, server.logger, static_cast (std::max (1, concurrency_a))); + runner = std::make_unique (io_ctx, server.logger, static_cast (std::max (1, concurrency_a)), nano::thread_role::name::io_ipc); } boost::asio::io_context & context () const diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 6eb077c7c0..22ae40f352 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -240,7 +240,7 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height)) { auto node_l (shared_from_this ()); - background ([node_l, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () { + io_ctx.post ([node_l, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () { boost::property_tree::ptree event; event.add ("account", account_a.to_account ()); event.add ("hash", block_a->hash ().to_string ()); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 3d85f7cffa..8ade202c4c 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -76,12 +76,6 @@ class node final : public std::enable_shared_from_this std::shared_ptr shared (); - template - void background (T action_a) - { - io_ctx.post (action_a); - } - bool copy_with_compaction (std::filesystem::path const &); void keepalive (std::string const &, uint16_t); int store_version (); diff --git a/nano/node/repcrawler.cpp b/nano/node/repcrawler.cpp index 5c0aa9def4..a0a67b0e6a 100644 --- a/nano/node/repcrawler.cpp +++ b/nano/node/repcrawler.cpp @@ -14,12 +14,16 @@ nano::rep_crawler::rep_crawler (nano::rep_crawler_config const & config_a, nano: network_constants{ node_a.network_params.network }, active{ node_a.active } { - if (!node.flags.disable_rep_crawler) - { - node.observers.endpoint.add ([this] (std::shared_ptr const & channel) { - query (channel); - }); - } + node.observers.endpoint.add ([this] (std::shared_ptr const & channel) { + if (!node.flags.disable_rep_crawler) + { + { + nano::lock_guard lock{ mutex }; + prioritized.push_back (channel); + } + condition.notify_all (); + } + }); } nano::rep_crawler::~rep_crawler () @@ -161,7 +165,7 @@ void nano::rep_crawler::run () lock.lock (); condition.wait_for (lock, query_interval (sufficient_weight), [this, sufficient_weight] { - return stopped || query_predicate (sufficient_weight) || !responses.empty (); + return stopped || query_predicate (sufficient_weight) || !responses.empty () || !prioritized.empty (); }); if (stopped) @@ -180,6 +184,16 @@ void nano::rep_crawler::run () cleanup (); + if (!prioritized.empty ()) + { + decltype (prioritized) prioritized_l; + prioritized_l.swap (prioritized); + + lock.unlock (); + query (prioritized_l); + lock.lock (); + } + if (query_predicate (sufficient_weight)) { last_query = std::chrono::steady_clock::now (); @@ -230,7 +244,7 @@ void nano::rep_crawler::cleanup () }); } -std::vector> nano::rep_crawler::prepare_crawl_targets (bool sufficient_weight) const +std::deque> nano::rep_crawler::prepare_crawl_targets (bool sufficient_weight) const { debug_assert (!mutex.try_lock ()); @@ -310,7 +324,7 @@ bool nano::rep_crawler::track_rep_request (hash_root_t hash_root, std::shared_pt return true; } -void nano::rep_crawler::query (std::vector> const & target_channels) +void nano::rep_crawler::query (std::deque> const & target_channels) { auto maybe_hash_root = prepare_query_target (); if (!maybe_hash_root) @@ -356,7 +370,7 @@ void nano::rep_crawler::query (std::vector const & target_channel) { - query (std::vector{ target_channel }); + query (std::deque{ target_channel }); } bool nano::rep_crawler::is_pr (std::shared_ptr const & channel) const @@ -432,6 +446,7 @@ std::vector nano::rep_crawler::representatives (std::size_ } std::vector result; + result.reserve (ordered.size ()); for (auto i = ordered.begin (), n = ordered.end (); i != n && result.size () < count; ++i) { auto const & [weight, rep] = *i; @@ -483,6 +498,7 @@ nano::container_info nano::rep_crawler::container_info () const info.put ("reps", reps); info.put ("queries", queries); info.put ("responses", responses); + info.put ("prioritized", prioritized); return info; } diff --git a/nano/node/repcrawler.hpp b/nano/node/repcrawler.hpp index d99bae5593..070ed95ee4 100644 --- a/nano/node/repcrawler.hpp +++ b/nano/node/repcrawler.hpp @@ -64,7 +64,7 @@ class rep_crawler final bool process (std::shared_ptr const &, std::shared_ptr const &); /** Attempt to determine if the peer manages one or more representative accounts */ - void query (std::vector> const & target_channels); + void query (std::deque> const & target_channels); /** Attempt to determine if the peer manages one or more representative accounts */ void query (std::shared_ptr const & target_channel); @@ -75,12 +75,10 @@ class rep_crawler final /** Get total available weight from representatives */ nano::uint128_t total_weight () const; - /** Request a list of the top \p count known representatives in descending order of weight, with at least \p weight_a voting weight, and optionally with a minimum version \p minimum_protocol_version - */ + /** Request a list of the top \p count known representatives in descending order of weight, with at least \p weight_a voting weight, and optionally with a minimum version \p minimum_protocol_version */ std::vector representatives (std::size_t count = std::numeric_limits::max (), nano::uint128_t minimum_weight = 0, std::optional const & minimum_protocol_version = {}) const; - /** Request a list of the top \p count known principal representatives in descending order of weight, optionally with a minimum version \p minimum_protocol_version - */ + /** Request a list of the top \p count known principal representatives in descending order of weight, optionally with a minimum version \p minimum_protocol_version */ std::vector principal_representatives (std::size_t count = std::numeric_limits::max (), std::optional const & minimum_protocol_version = {}) const; /** Total number of representatives */ @@ -106,7 +104,8 @@ class rep_crawler final using hash_root_t = std::pair; /** Returns a list of endpoints to crawl. The total weight is passed in to avoid computing it twice. */ - std::vector> prepare_crawl_targets (bool sufficient_weight) const; + + std::deque> prepare_crawl_targets (bool sufficient_weight) const; std::optional prepare_query_target () const; bool track_rep_request (hash_root_t hash_root, std::shared_ptr const & channel); @@ -173,9 +172,13 @@ class rep_crawler final private: static size_t constexpr max_responses{ 1024 * 4 }; + using response_t = std::pair, std::shared_ptr>; boost::circular_buffer responses{ max_responses }; + // Freshly established connections that should be queried asap + std::deque> prioritized; + std::chrono::steady_clock::time_point last_query{}; std::atomic stopped{ false }; diff --git a/nano/node/transport/channel.cpp b/nano/node/transport/channel.cpp index 8b7055c46a..7a0a9ce40c 100644 --- a/nano/node/transport/channel.cpp +++ b/nano/node/transport/channel.cpp @@ -36,7 +36,7 @@ void nano::transport::channel::send (nano::message & message_a, std::functionbackground ([callback = std::move (callback_a)] () { + node_l->io_ctx.post ([callback = std::move (callback_a)] () { callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); }); } @@ -161,7 +161,7 @@ void nano::transport::tcp_socket::async_write (nano::shared_const_buffer const & { if (callback_a) { - node_l->background ([callback = std::move (callback_a)] () { + node_l->io_ctx.post ([callback = std::move (callback_a)] () { callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); }); } diff --git a/nano/qt/qt.cpp b/nano/qt/qt.cpp index 2782e10188..db6365077c 100644 --- a/nano/qt/qt.cpp +++ b/nano/qt/qt.cpp @@ -714,7 +714,7 @@ nano_qt::block_viewer::block_viewer (nano_qt::wallet & wallet_a) : if (this->wallet.node.ledger.any.block_exists (transaction, block)) { rebroadcast->setEnabled (false); - this->wallet.node.background ([this, block] () { + this->wallet.node.workers.post ([this, block] () { rebroadcast_action (block); }); } @@ -1194,7 +1194,7 @@ void nano_qt::wallet::start () if (this_l->wallet_m->store.valid_password (transaction)) { this_l->send_blocks_send->setEnabled (false); - this_l->node.background ([this_w, account_l, actual] () { + this_l->node.workers.post ([this_w, account_l, actual] () { if (auto this_l = this_w.lock ()) { this_l->wallet_m->send_async (this_l->account, account_l, actual, [this_w] (std::shared_ptr const & block_a) { diff --git a/nano/store/transaction.cpp b/nano/store/transaction.cpp index dcfc3d7e8d..0d1d6a9008 100644 --- a/nano/store/transaction.cpp +++ b/nano/store/transaction.cpp @@ -9,6 +9,7 @@ nano::store::transaction_impl::transaction_impl (nano::id_dispenser::id_t const store_id_a) : store_id{ store_id_a } { + debug_assert (!nano::thread_role::is_network_io (), "database operations are not allowed to run on network IO threads"); } /*