From 9d2f6ee84be9519bcb8b6392a921905fd60b69c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 4 Mar 2024 22:58:39 +0100 Subject: [PATCH] WIP --- nano/lib/stats_enums.hpp | 1 + nano/node/blockprocessor.cpp | 71 +++++++++++++---- nano/node/blockprocessor.hpp | 4 +- nano/node/fair_queue.hpp | 145 +++++++++++++++++++++++++++++------ nano/node/network.cpp | 1 + 5 files changed, 180 insertions(+), 42 deletions(-) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 5150d7dec0..19f74b43ac 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -75,6 +75,7 @@ enum class detail : uint8_t none, success, unknown, + queue_overflow, // processing queue queue, diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 56101dedf5..e76c981097 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -45,11 +45,45 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas block_processed.notify (result, context); } }); - + processing_thread = std::thread ([this] () { nano::thread_role::set (nano::thread_role::name::block_processing); this->process_blocks (); }); + + queue.max_size_query = [this] (auto const & origin) { + switch (origin.source) + { + case nano::block_source::live: + return 128; + default: + return 1024 * 16; + } + }; + + queue.priority_query = [this] (auto const & origin) { + switch (origin.source) + { + case nano::block_source::live: + return 1; + case nano::block_source::local: + return 16; + case nano::block_source::bootstrap: + return 8; + default: + return 1; + } + }; + + queue.rate_limit_query = [this] (auto const & origin) -> std::pair { + switch (origin.source) + { + case nano::block_source::live: + return { 100, 3.0 }; + default: + return { 0, 1.0 }; // Unlimited + } + }; } void nano::block_processor::stop () @@ -85,7 +119,7 @@ bool nano::block_processor::half_full () const return size () >= node.flags.block_processor_full_size / 2; } -void nano::block_processor::add (std::shared_ptr const & block, block_source const source) +void nano::block_processor::add (std::shared_ptr const & block, block_source const source, std::shared_ptr channel) { if (full ()) { @@ -99,9 +133,12 @@ void nano::block_processor::add (std::shared_ptr const & block, blo } node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process); - node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source)); + node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})", + block->hash ().to_string (), + to_string (source), + channel ? channel->to_string () : ""); // TODO: Lazy eval - add_impl (context{ block, source }); + add_impl (context{ block, source }, channel); } std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) @@ -136,10 +173,18 @@ void nano::block_processor::force (std::shared_ptr const & block_a) node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force); node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ()); + add_impl (context{ block_a, block_source::forced }); +} + +void nano::block_processor::add_impl (context ctx, std::shared_ptr channel) +{ { - nano::lock_guard lock{ mutex }; - // forced.emplace_back (context{ block_a, block_source::forced }); - queue.push (context{ block_a, block_source::forced }, block_source::forced); + nano::lock_guard guard{ mutex }; + bool overflow = queue.push (std::move (ctx), ctx.source, channel); + if (overflow) + { + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::queue_overflow); + } } condition.notify_all (); } @@ -224,16 +269,6 @@ bool nano::block_processor::should_log () return result; } -void nano::block_processor::add_impl (context ctx) -{ - release_assert (ctx.source != nano::block_source::forced); - { - nano::lock_guard guard{ mutex }; - // blocks.emplace_back (std::move (ctx)); - } - condition.notify_all (); -} - auto nano::block_processor::next () -> context { debug_assert (!mutex.try_lock ()); @@ -259,6 +294,8 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock lock_a.lock (); + queue.periodic_cleanup (); + timer_l.start (); // Processing blocks diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index af73a58b11..3bf2c98eab 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -70,7 +70,7 @@ class block_processor final std::size_t size (block_source) const; bool full () const; bool half_full () const; - void add (std::shared_ptr const &, block_source = block_source::live); + void add (std::shared_ptr const &, block_source = block_source::live, std::shared_ptr channel = nullptr); std::optional add_blocking (std::shared_ptr const & block, block_source); void force (std::shared_ptr const &); bool should_log (); @@ -95,7 +95,7 @@ class block_processor final void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &); processed_batch_t process_batch (nano::unique_lock &); context next (); - void add_impl (context); + void add_impl (context, std::shared_ptr channel = nullptr); private: // Dependencies nano::node & node; diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp index 352b19da0e..efe5eec8bd 100644 --- a/nano/node/fair_queue.hpp +++ b/nano/node/fair_queue.hpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -28,21 +29,44 @@ class fair_queue final private: struct entry { - // using queue_t = boost::circular_buffer; using queue_t = std::deque; queue_t requests; - nano::bandwidth_limiter limiter; + nano::bandwidth_limiter_st limiter; size_t const priority; size_t const max_size; entry (size_t max_size, size_t priority, size_t max_rate, double max_burst_ratio) : - // requests{ max_size }, limiter{ max_rate, max_burst_ratio }, priority{ priority }, max_size{ max_size } { } + + Request pop () + { + release_assert (!requests.empty ()); + + auto request = std::move (requests.front ()); + requests.pop_front (); + return request; + } + + bool push (Request request) + { + requests.push_back (std::move (request)); + if (requests.size () > max_size) + { + requests.pop_front (); + return true; // Overflow + } + return false; // No overflow + } + + bool empty () const + { + return requests.empty (); + } }; public: @@ -58,8 +82,6 @@ class fair_queue final using value_type = std::pair; public: - explicit fair_queue () = default; - size_t size (Source source, std::shared_ptr channel = nullptr) const { auto it = queues.find (source_type{ source, channel }); @@ -75,8 +97,8 @@ class fair_queue final bool empty () const { - return std::any_of (queues.begin (), queues.end (), [] (auto const & queue) { - return !queue.second.requests.empty (); + return std::all_of (queues.begin (), queues.end (), [] (auto const & queue) { + return queue.second.requests.empty (); }); } @@ -91,14 +113,18 @@ class fair_queue final } /// Should be called periodically to clean up stale channels - void cleanup () + void periodic_cleanup () { - erase_if (queues, [] (auto const & entry) { - return !entry.first.second.alive (); - }); + std::chrono::seconds const cleanup_interval{ 30 }; + + if (elapsed (last_cleanup, cleanup_interval)) + { + last_cleanup = std::chrono::steady_clock::now (); + cleanup (); + } } - void push (Request request, Source source, std::shared_ptr channel = nullptr) + bool push (Request request, Source source, std::shared_ptr channel = nullptr) { auto const source_key = source_type{ source, channel }; @@ -109,18 +135,18 @@ class fair_queue final { auto max_size = max_size_query (source_key); auto priority = priority_query (source_key); - auto [max_rate, max_burst_ratio] = rate_query (source_key); + auto [max_rate, max_burst_ratio] = rate_limit_query (source_key); - entry new_entry{ max_size, priority, max_rate, max_burst_ratio }; + debug_assert (max_size > 0); + debug_assert (priority > 0); - // it = queues.emplace (std::make_pair (source_key, entry{ max_size, priority, max_rate, max_burst_ratio })).first; + entry new_entry{ max_size, priority, max_rate, max_burst_ratio }; + it = queues.emplace (source_type{ source, channel }, std::move (new_entry)).first; } + release_assert (it != queues.end ()); auto & queue = it->second; - - // queue.requests.push_back (std::move (request)); - - // queue.requests.push_back (std::move (request)); + return queue.push (std::move (request)); } public: @@ -130,24 +156,97 @@ class fair_queue final query_size_t max_size_query{ [] (auto const & origin) { debug_assert (false, "max_size_query callback empty"); return 0; } }; query_priority_t priority_query{ [] (auto const & origin) { debug_assert (false, "priority_query callback empty"); return 0; } }; - query_rate_t rate_query{ [] (auto const & origin) { debug_assert (false, "rate_query callback empty"); return std::pair{ 0, 1.0 }; } }; + query_rate_t rate_limit_query{ [] (auto const & origin) { debug_assert (false, "rate_query callback empty"); return std::pair{ 0, 1.0 }; } }; public: value_type next () { + debug_assert (!empty ()); // Should be checked before calling next + + auto should_seek = [&, this] () { + if (current_queue == queues.end ()) + { + return true; + } + auto & queue = current_queue->second; + if (queue.empty ()) + { + return true; + } + // Allow up to `queue.priority` requests to be processed before moving to the next queue + if (current_queue_counter >= queue.priority) + { + return true; + } + return false; + }; + + if (should_seek ()) + { + seek_next (); + } + + release_assert (current_queue != queues.end ()); + + auto & source = current_queue->first; + auto & queue = current_queue->second; + + ++current_queue_counter; + auto request = queue.pop (); + return { std::move (request), source }; } - std::deque next_batch (size_t max_count); + std::deque next_batch (size_t max_count) + { + // TODO: Naive implementation, could be optimized + std::deque result; + while (!empty () && result.size () < max_count) + { + result.emplace_back (next ()); + } + return result; + } + +private: + void seek_next () + { + do + { + if (current_queue != queues.end ()) + { + ++current_queue; + } + if (current_queue == queues.end ()) + { + current_queue = queues.begin (); + } + release_assert (current_queue != queues.end ()); + } while (current_queue->second.empty ()); + } + + void cleanup () + { + current_queue = queues.end (); // Invalidate current iterator + + erase_if (queues, [] (auto const & entry) { + return !entry.first.channel->alive (); + }); + } private: std::map queues; + decltype (queues)::iterator current_queue{ queues.end () }; + size_t current_queue_counter{ 0 }; + + std::chrono::steady_clock::time_point last_cleanup{}; public: std::unique_ptr collect_container_info (std::string const & name) { auto composite = std::make_unique (name); - // composite->add_component (std::make_unique (container_info{ "queue", queue.size (), sizeof (typename decltype (queue)::value_type) })); + composite->add_component (std::make_unique (container_info{ "queues", queues.size (), sizeof (typename decltype (queues)::value_type) })); + composite->add_component (std::make_unique (container_info{ "total_size", total_size (), sizeof (typename decltype (queues)::value_type) })); return composite; } }; -} \ No newline at end of file +} diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 0c9cf2268c..05e478890a 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -255,6 +255,7 @@ class network_message_visitor : public nano::message_visitor { if (!node.block_processor.full ()) { + // TODO: Pass channel source node.process_active (message_a.block); } else