From 0ddc5dbe9bb218047c1334c51db799fd7e06afba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 4 Mar 2024 19:41:17 +0100 Subject: [PATCH] WIP --- nano/lib/stats_enums.hpp | 2 + nano/node/CMakeLists.txt | 1 + nano/node/blockprocessor.cpp | 142 ++++++------ nano/node/blockprocessor.hpp | 20 +- nano/node/bootstrap/bootstrap_legacy.cpp | 4 +- nano/node/bootstrap_ascending/service.cpp | 2 +- nano/node/fair_queue.hpp | 250 ++++++++++++++++++++++ nano/node/network.cpp | 11 +- 8 files changed, 345 insertions(+), 87 deletions(-) create mode 100644 nano/node/fair_queue.hpp diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 5150d7dec0..bc1c62e038 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -38,6 +38,7 @@ enum class type : uint8_t blockprocessor, blockprocessor_source, blockprocessor_result, + blockprocessor_overfill, bootstrap_server, active, active_started, @@ -75,6 +76,7 @@ enum class detail : uint8_t none, success, unknown, + queue_overflow, // processing queue queue, diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index d25585ff44..6e9cf426c8 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -78,6 +78,7 @@ add_library( election_insertion_result.hpp epoch_upgrader.hpp epoch_upgrader.cpp + fair_queue.hpp inactive_cache_information.hpp inactive_cache_information.cpp inactive_cache_status.hpp diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 69ba26feb0..ed6d7b9e12 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -45,6 +45,32 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas block_processed.notify (result, context); } }); + + // TODO: Make these configurable + queue.max_size_query = [this] (auto const & origin) { + switch (origin.source) + { + case nano::block_source::live: + return 128; + default: + return 1024 * 16; + } + }; + + // TODO: Make these configurable + queue.priority_query = [this] (auto const & origin) { + switch (origin.source) + { + case nano::block_source::live: + return 1; + case nano::block_source::local: + return 16; + case nano::block_source::bootstrap: + return 8; + default: + return 1; + } + }; } nano::block_processor::~block_processor () @@ -76,39 +102,44 @@ void nano::block_processor::stop () } } -std::size_t nano::block_processor::size () +// TODO: Remove and replace all checks with calls to size (block_source) +std::size_t nano::block_processor::size () const { nano::unique_lock lock{ mutex }; - return blocks.size () + forced.size (); + return queue.total_size (); } -bool nano::block_processor::full () +std::size_t nano::block_processor::size (nano::block_source source) const +{ + nano::unique_lock lock{ mutex }; + return queue.size (source); +} + +bool nano::block_processor::full () const { return size () >= node.flags.block_processor_full_size; } -bool nano::block_processor::half_full () +bool nano::block_processor::half_full () const { return size () >= node.flags.block_processor_full_size / 2; } -void nano::block_processor::add (std::shared_ptr const & block, block_source const source) +bool nano::block_processor::add (std::shared_ptr const & block, block_source const source, std::shared_ptr channel) { - if (full ()) - { - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill); - return; - } if (node.network_params.work.validate_entry (*block)) // true => error { node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work); - return; + return false; // Not added } node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process); - node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source)); + node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})", + block->hash ().to_string (), + to_string (source), + channel ? channel->to_string () : ""); // TODO: Lazy eval - add_impl (context{ block, source }); + return add_impl (context{ block, source }, channel); } std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) @@ -143,11 +174,26 @@ void nano::block_processor::force (std::shared_ptr const & block_a) node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force); node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ()); + add_impl (context{ block_a, block_source::forced }); +} + +bool nano::block_processor::add_impl (context ctx, std::shared_ptr channel) +{ + bool added = false; { - nano::lock_guard lock{ mutex }; - forced.emplace_back (context{ block_a, block_source::forced }); + nano::lock_guard guard{ mutex }; + added = queue.push (std::move (ctx), ctx.source, channel); } - condition.notify_all (); + if (added) + { + condition.notify_all (); + } + else + { + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill); + node.stats.inc (nano::stat::type::blockprocessor_overfill, to_stat_detail (ctx.source)); + } + return added; } void nano::block_processor::rollback_competitor (store::write_transaction const & transaction, nano::block const & block) @@ -191,7 +237,7 @@ void nano::block_processor::run () nano::unique_lock lock{ mutex }; while (!stopped) { - if (have_blocks_ready ()) + if (!queue.empty ()) { active = true; lock.unlock (); @@ -230,47 +276,16 @@ bool nano::block_processor::should_log () return result; } -bool nano::block_processor::have_blocks_ready () -{ - debug_assert (!mutex.try_lock ()); - return !blocks.empty () || !forced.empty (); -} - -bool nano::block_processor::have_blocks () -{ - debug_assert (!mutex.try_lock ()); - return have_blocks_ready (); -} - -void nano::block_processor::add_impl (context ctx) -{ - release_assert (ctx.source != nano::block_source::forced); - { - nano::lock_guard guard{ mutex }; - blocks.emplace_back (std::move (ctx)); - } - condition.notify_all (); -} - auto nano::block_processor::next () -> context { debug_assert (!mutex.try_lock ()); - debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next - - if (!forced.empty ()) - { - auto entry = std::move (forced.front ()); - release_assert (entry.source == nano::block_source::forced); - forced.pop_front (); - return entry; - } + debug_assert (!queue.empty ()); // This should be checked before calling next - if (!blocks.empty ()) + if (!queue.empty ()) { - auto entry = std::move (blocks.front ()); - release_assert (entry.source != nano::block_source::forced); - blocks.pop_front (); - return entry; + auto [request, origin] = queue.next (); + release_assert (origin.source != nano::block_source::forced || request.source == nano::block_source::forced); + return std::move (request); } release_assert (false, "next() called when no blocks are ready"); @@ -286,19 +301,22 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock lock_a.lock (); + queue.periodic_cleanup (); + timer_l.start (); + // Processing blocks unsigned number_of_blocks_processed (0), number_of_forced_processed (0); auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); }; auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; }; auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; }; - while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ()) + while (!queue.empty () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ()) { // TODO: Cleaner periodical logging - if ((blocks.size () + forced.size () > 64) && should_log ()) + if (should_log ()) { - node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ()); + node.logger.debug (nano::log::type::blockprocessor, "{} blocks in processing queue", queue.total_size ()); } auto ctx = next (); @@ -433,18 +451,8 @@ void nano::block_processor::queue_unchecked (store::write_transaction const & tr std::unique_ptr nano::block_processor::collect_container_info (std::string const & name) { - std::size_t blocks_count; - std::size_t forced_count; - - { - nano::lock_guard guard{ mutex }; - blocks_count = blocks.size (); - forced_count = forced.size (); - } - auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "blocks", blocks_count, sizeof (decltype (blocks)::value_type) })); - composite->add_component (std::make_unique (container_info{ "forced", forced_count, sizeof (decltype (forced)::value_type) })); + composite->add_component (queue.collect_container_info ("queue")); return composite; } diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 620abcdd95..6615221afb 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -68,15 +69,14 @@ class block_processor final void start (); void stop (); - std::size_t size (); - bool full (); - bool half_full (); - void add (std::shared_ptr const &, block_source = block_source::live); + std::size_t size () const; + std::size_t size (block_source) const; + bool full () const; + bool half_full () const; + bool add (std::shared_ptr const &, block_source = block_source::live, std::shared_ptr channel = nullptr); std::optional add_blocking (std::shared_ptr const & block, block_source); void force (std::shared_ptr const &); bool should_log (); - bool have_blocks_ready (); - bool have_blocks (); std::unique_ptr collect_container_info (std::string const & name); @@ -99,7 +99,7 @@ class block_processor final void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &); processed_batch_t process_batch (nano::unique_lock &); context next (); - void add_impl (context); + bool add_impl (context, std::shared_ptr channel = nullptr); private: // Dependencies nano::node & node; @@ -109,12 +109,12 @@ class block_processor final bool stopped{ false }; bool active{ false }; - std::deque blocks; - std::deque forced; + nano::fair_queue queue; std::chrono::steady_clock::time_point next_log; + nano::condition_variable condition; - nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; + mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; std::thread thread; }; } diff --git a/nano/node/bootstrap/bootstrap_legacy.cpp b/nano/node/bootstrap/bootstrap_legacy.cpp index 9badee97cc..98d48e9ead 100644 --- a/nano/node/bootstrap/bootstrap_legacy.cpp +++ b/nano/node/bootstrap/bootstrap_legacy.cpp @@ -225,9 +225,9 @@ void nano::bootstrap_attempt_legacy::run () // TODO: This check / wait is a heuristic and should be improved. auto wait_start = std::chrono::steady_clock::now (); - while (!stopped && node->block_processor.size () != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 })) + while (!stopped && node->block_processor.size (nano::block_source::bootstrap_legacy) != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 })) { - condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size () == 0; }); + condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size (nano::block_source::bootstrap_legacy) == 0; }); } if (start_account.number () != std::numeric_limits::max ()) diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index 1ab1362849..aef8bff300 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -193,7 +193,7 @@ void nano::bootstrap_ascending::service::inspect (store::transaction const & tx, void nano::bootstrap_ascending::service::wait_blockprocessor () { nano::unique_lock lock{ mutex }; - while (!stopped && block_processor.size () > config.bootstrap_ascending.block_wait_count) + while (!stopped && block_processor.size (nano::block_source::bootstrap) > config.bootstrap_ascending.block_wait_count) { condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; }); // Blockprocessor is relatively slow, sleeping here instead of using conditions } diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp new file mode 100644 index 0000000000..5bab741dea --- /dev/null +++ b/nano/node/fair_queue.hpp @@ -0,0 +1,250 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace nano +{ +template +class fair_queue final +{ +private: + struct entry + { + using queue_t = std::deque; + + queue_t requests; + size_t const priority; + size_t const max_size; + + entry (size_t max_size, size_t priority) : + priority{ priority }, + max_size{ max_size } + { + } + + Request pop () + { + release_assert (!requests.empty ()); + + auto request = std::move (requests.front ()); + requests.pop_front (); + return request; + } + + bool push (Request request) + { + if (requests.size () < max_size) + { + requests.push_back (std::move (request)); + return true; // Added + } + return false; // Dropped + } + + bool empty () const + { + return requests.empty (); + } + }; + +public: + // using source_type = std::pair>; + struct source_type + { + Source source; + std::shared_ptr channel; + + auto operator<=> (source_type const &) const = default; + }; + + using value_type = std::pair; + +public: + size_t size (Source source, std::shared_ptr channel = nullptr) const + { + auto it = queues.find (source_type{ source, channel }); + return it == queues.end () ? 0 : it->second.requests.size (); + } + + size_t total_size () const + { + return std::accumulate (queues.begin (), queues.end (), 0, [] (size_t total, auto const & queue) { + return total + queue.second.requests.size (); + }); + }; + + bool empty () const + { + return std::all_of (queues.begin (), queues.end (), [] (auto const & queue) { + return queue.second.requests.empty (); + }); + } + + size_t queues_size () const + { + return queues.size (); + } + + void clear () + { + queues.clear (); + } + + /// Should be called periodically to clean up stale channels + void periodic_cleanup (std::chrono::milliseconds interval = std::chrono::milliseconds{ 1000 * 30 }) + { + if (elapsed (last_cleanup, interval)) + { + last_cleanup = std::chrono::steady_clock::now (); + cleanup (); + } + } + + bool push (Request request, Source source, std::shared_ptr channel = nullptr) + { + auto const source_key = source_type{ source, channel }; + + auto it = queues.find (source_key); + + // Create a new queue if it doesn't exist + if (it == queues.end ()) + { + auto max_size = max_size_query (source_key); + auto priority = priority_query (source_key); + + debug_assert (max_size > 0); + debug_assert (priority > 0); + + // It's safe to not invalidate current iterator, since std::map container guarantees that iterators are not invalidated by insert operations + it = queues.emplace (source_type{ source, channel }, entry{ max_size, priority }).first; + } + release_assert (it != queues.end ()); + + auto & queue = it->second; + return queue.push (std::move (request)); // True if added, false if dropped + } + +public: + using max_size_query_t = std::function; + using priority_query_t = std::function; + + max_size_query_t max_size_query{ [] (auto const & origin) { debug_assert (false, "max_size_query callback empty"); return 0; } }; + priority_query_t priority_query{ [] (auto const & origin) { debug_assert (false, "priority_query callback empty"); return 0; } }; + +public: + value_type next () + { + debug_assert (!empty ()); // Should be checked before calling next + + auto should_seek = [&, this] () { + if (current_queue == queues.end ()) + { + return true; + } + auto & queue = current_queue->second; + if (queue.empty ()) + { + return true; + } + // Allow up to `queue.priority` requests to be processed before moving to the next queue + if (current_queue_counter >= queue.priority) + { + return true; + } + return false; + }; + + if (should_seek ()) + { + seek_next (); + } + + release_assert (current_queue != queues.end ()); + + auto & source = current_queue->first; + auto & queue = current_queue->second; + + ++current_queue_counter; + return { queue.pop (), source }; + } + + std::deque next_batch (size_t max_count) + { + // TODO: Naive implementation, could be optimized + std::deque result; + while (!empty () && result.size () < max_count) + { + result.emplace_back (next ()); + } + return result; + } + +private: + void seek_next () + { + current_queue_counter = 0; + do + { + if (current_queue != queues.end ()) + { + ++current_queue; + } + if (current_queue == queues.end ()) + { + current_queue = queues.begin (); + } + release_assert (current_queue != queues.end ()); + } while (current_queue->second.empty ()); + } + + void cleanup () + { + // Invalidate the current iterator + current_queue = queues.end (); + + erase_if (queues, [] (auto const & entry) { + if (entry.first.channel) + { + return !entry.first.channel->alive (); + } + // Some sources (eg. local RPC) don't have an associated channel, never remove their queue + return false; + }); + } + +private: + std::map queues; + decltype (queues)::iterator current_queue{ queues.end () }; + size_t current_queue_counter{ 0 }; + + std::chrono::steady_clock::time_point last_cleanup{}; + +public: + std::unique_ptr collect_container_info (std::string const & name) + { + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "queues", queues.size (), sizeof (typename decltype (queues)::value_type) })); + composite->add_component (std::make_unique (container_info{ "total_size", total_size (), sizeof (typename decltype (queues)::value_type) })); + return composite; + } +}; +} diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 0c9cf2268c..b5a1840728 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -251,15 +251,12 @@ class network_message_visitor : public nano::message_visitor } } - void publish (nano::publish const & message_a) override + void publish (nano::publish const & message) override { - if (!node.block_processor.full ()) + bool added = node.block_processor.add (message.block, nano::block_source::live, channel); + if (!added) { - node.process_active (message_a.block); - } - else - { - node.network.publish_filter.clear (message_a.digest); + node.network.publish_filter.clear (message.digest); node.stats.inc (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::in); } }