From 2beefa2caf79dd18df73b656db62ebbdc45d8e48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Wed, 6 Mar 2024 18:43:57 +0100 Subject: [PATCH] WIP --- nano/node/blockprocessor.cpp | 4 +- nano/node/blockprocessor.hpp | 2 +- nano/node/fair_queue.hpp | 83 ++++++++++++++++++++++++++---------- 3 files changed, 64 insertions(+), 25 deletions(-) diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index e76c981097..bd9661dd9c 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -106,7 +106,7 @@ std::size_t nano::block_processor::size () const std::size_t nano::block_processor::size (nano::block_source source) const { nano::unique_lock lock{ mutex }; - return queue.size (source); + return queue.size ({ source }); } bool nano::block_processor::full () const @@ -180,7 +180,7 @@ void nano::block_processor::add_impl (context ctx, std::shared_ptr guard{ mutex }; - bool overflow = queue.push (std::move (ctx), ctx.source, channel); + bool overflow = queue.push (std::move (ctx), { ctx.source, channel }); if (overflow) { node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::queue_overflow); diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 3bf2c98eab..565e6930a7 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -105,7 +105,7 @@ class block_processor final bool stopped{ false }; bool active{ false }; - nano::fair_queue queue; + nano::per_peer_fair_queue queue; std::chrono::steady_clock::time_point next_log; diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp index efe5eec8bd..8c1721b111 100644 --- a/nano/node/fair_queue.hpp +++ b/nano/node/fair_queue.hpp @@ -21,6 +21,49 @@ #include #include +namespace nano::fair_queue_sources +{ +template +struct source_by_type +{ + Type source; + + // Keep implicit for better ergonomics + source_by_type (Type source) : + source{ source } + { + } + + bool alive () const + { + return true; + } + + auto operator<=> (source_by_type const &) const = default; +}; + +template +struct source_by_type_and_channel +{ + Type source; + std::shared_ptr channel; + + // Keep implicit for better ergonomics + source_by_type_and_channel (Type source, std::shared_ptr channel = nullptr) : + source{ source }, + channel{ channel } + { + } + + bool alive () const + { + return channel->alive (); + } + + auto operator<=> (source_by_type_and_channel const &) const = default; +}; +} + namespace nano { template @@ -70,21 +113,13 @@ class fair_queue final }; public: - // using source_type = std::pair>; - struct source_type - { - Source source; - std::shared_ptr channel; - - auto operator<=> (source_type const &) const = default; - }; - - using value_type = std::pair; + using source_type = Source; + using value_type = std::pair; public: - size_t size (Source source, std::shared_ptr channel = nullptr) const + size_t size (Source source) const { - auto it = queues.find (source_type{ source, channel }); + auto it = queues.find (source); return it == queues.end () ? 0 : it->second.requests.size (); } @@ -112,7 +147,7 @@ class fair_queue final queues.clear (); } - /// Should be called periodically to clean up stale channels + /// Should be called periodically to clean up stale sources void periodic_cleanup () { std::chrono::seconds const cleanup_interval{ 30 }; @@ -124,24 +159,22 @@ class fair_queue final } } - bool push (Request request, Source source, std::shared_ptr channel = nullptr) + bool push (Request request, Source source) { - auto const source_key = source_type{ source, channel }; - - auto it = queues.find (source_key); + auto it = queues.find (source); // Create a new queue if it doesn't exist if (it == queues.end ()) { - auto max_size = max_size_query (source_key); - auto priority = priority_query (source_key); - auto [max_rate, max_burst_ratio] = rate_limit_query (source_key); + auto max_size = max_size_query (source); + auto priority = priority_query (source); + auto [max_rate, max_burst_ratio] = rate_limit_query (source); debug_assert (max_size > 0); debug_assert (priority > 0); entry new_entry{ max_size, priority, max_rate, max_burst_ratio }; - it = queues.emplace (source_type{ source, channel }, std::move (new_entry)).first; + it = queues.emplace (source, std::move (new_entry)).first; } release_assert (it != queues.end ()); @@ -229,7 +262,7 @@ class fair_queue final current_queue = queues.end (); // Invalidate current iterator erase_if (queues, [] (auto const & entry) { - return !entry.first.channel->alive (); + return !entry.first.alive (); }); } @@ -249,4 +282,10 @@ class fair_queue final return composite; } }; + +template +using per_peer_fair_queue = fair_queue, Request>; + +template +using per_type_fair_queue = fair_queue, Request>; }