diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index dec4062fb1..701987826f 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -40,6 +40,7 @@ add_executable( peer_container.cpp scheduler_buckets.cpp request_aggregator.cpp + scheduler_limiter.cpp signal_manager.cpp signing.cpp socket.cpp diff --git a/nano/core_test/election.cpp b/nano/core_test/election.cpp index 94f33a034d..0faa1ca184 100644 --- a/nano/core_test/election.cpp +++ b/nano/core_test/election.cpp @@ -17,6 +17,23 @@ TEST (election, construction) node, nano::dev::genesis, [] (auto const &) {}, [] (auto const &) {}, nano::election_behavior::normal); } +// This tests the election destruction event notification +// Since the notification is signalled in the destructor, it needs to be freed +TEST (election, destructor_observer) +{ + std::atomic destroyed{ false }; + { + nano::test::system system (1); + auto & node = *system.nodes[0]; + auto election = std::make_shared ( + node, nano::dev::genesis, [] (auto const &) {}, [] (auto const &) {}, nano::election_behavior::normal); + election->destructor_observers.add ([&destroyed] (auto const & qualified_root) { + destroyed = true; + }); + } + ASSERT_TRUE (destroyed); +} + TEST (election, behavior) { nano::test::system system (1); diff --git a/nano/core_test/scheduler_buckets.cpp b/nano/core_test/scheduler_buckets.cpp index 0a9c6adfba..62f18820b8 100644 --- a/nano/core_test/scheduler_buckets.cpp +++ b/nano/core_test/scheduler_buckets.cpp @@ -1,5 +1,6 @@ #include #include +#include #include @@ -108,27 +109,27 @@ std::shared_ptr & block3 () TEST (buckets, construction) { - nano::scheduler::buckets buckets; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null }; ASSERT_EQ (0, buckets.size ()); - ASSERT_TRUE (buckets.empty ()); + ASSERT_TRUE (buckets.empty () && !buckets.available ()); // Initial state ASSERT_EQ (62, buckets.bucket_count ()); } TEST (buckets, index_min) { - nano::scheduler::buckets buckets; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null }; ASSERT_EQ (0, buckets.index (std::numeric_limits::min ())); } TEST (buckets, index_max) { - nano::scheduler::buckets buckets; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null }; ASSERT_EQ (buckets.bucket_count () - 1, buckets.index (std::numeric_limits::max ())); } TEST (buckets, insert_Gxrb) { - nano::scheduler::buckets buckets; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null }; buckets.push (1000, block0 (), nano::Gxrb_ratio); ASSERT_EQ (1, buckets.size ()); ASSERT_EQ (1, buckets.bucket_size (48)); @@ -136,7 +137,7 @@ TEST (buckets, insert_Gxrb) TEST (buckets, insert_Mxrb) { - nano::scheduler::buckets buckets; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null }; buckets.push (1000, block1 (), nano::Mxrb_ratio); ASSERT_EQ (1, buckets.size ()); ASSERT_EQ (1, buckets.bucket_size (13)); @@ -145,7 +146,7 @@ TEST (buckets, insert_Mxrb) // Test two blocks with the same priority TEST (buckets, insert_same_priority) { - nano::scheduler::buckets buckets; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null }; buckets.push (1000, block0 (), nano::Gxrb_ratio); buckets.push (1000, block2 (), nano::Gxrb_ratio); ASSERT_EQ (2, buckets.size ()); @@ -155,7 +156,7 @@ TEST (buckets, insert_same_priority) // Test the same block inserted multiple times TEST (buckets, insert_duplicate) { - nano::scheduler::buckets buckets; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null }; buckets.push (1000, block0 (), nano::Gxrb_ratio); buckets.push (1000, block0 (), nano::Gxrb_ratio); ASSERT_EQ (1, buckets.size ()); @@ -164,18 +165,18 @@ TEST (buckets, insert_duplicate) TEST (buckets, insert_older) { - nano::scheduler::buckets buckets; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null }; buckets.push (1000, block0 (), nano::Gxrb_ratio); buckets.push (1100, block2 (), nano::Gxrb_ratio); - ASSERT_EQ (block0 (), buckets.top ()); + ASSERT_EQ (block0 (), buckets.top ().first); buckets.pop (); - ASSERT_EQ (block2 (), buckets.top ()); + ASSERT_EQ (block2 (), buckets.top ().first); buckets.pop (); } TEST (buckets, pop) { - nano::scheduler::buckets buckets; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null }; ASSERT_TRUE (buckets.empty ()); buckets.push (1000, block0 (), nano::Gxrb_ratio); ASSERT_FALSE (buckets.empty ()); @@ -185,69 +186,69 @@ TEST (buckets, pop) TEST (buckets, top_one) { - nano::scheduler::buckets buckets; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null }; buckets.push (1000, block0 (), nano::Gxrb_ratio); - ASSERT_EQ (block0 (), buckets.top ()); + ASSERT_EQ (block0 (), buckets.top ().first); } TEST (buckets, top_two) { - nano::scheduler::buckets buckets; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null }; buckets.push (1000, block0 (), nano::Gxrb_ratio); buckets.push (1, block1 (), nano::Mxrb_ratio); - ASSERT_EQ (block0 (), buckets.top ()); + ASSERT_EQ (block0 (), buckets.top ().first); buckets.pop (); - ASSERT_EQ (block1 (), buckets.top ()); + ASSERT_EQ (block1 (), buckets.top ().first); buckets.pop (); ASSERT_TRUE (buckets.empty ()); } TEST (buckets, top_round_robin) { - nano::scheduler::buckets buckets; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null }; buckets.push (1000, blockzero (), 0); - ASSERT_EQ (blockzero (), buckets.top ()); + ASSERT_EQ (blockzero (), buckets.top ().first); buckets.push (1000, block0 (), nano::Gxrb_ratio); buckets.push (1000, block1 (), nano::Mxrb_ratio); buckets.push (1100, block3 (), nano::Mxrb_ratio); buckets.pop (); // blockzero - EXPECT_EQ (block1 (), buckets.top ()); + EXPECT_EQ (block1 (), buckets.top ().first); buckets.pop (); - EXPECT_EQ (block0 (), buckets.top ()); + EXPECT_EQ (block0 (), buckets.top ().first); buckets.pop (); - EXPECT_EQ (block3 (), buckets.top ()); + EXPECT_EQ (block3 (), buckets.top ().first); buckets.pop (); EXPECT_TRUE (buckets.empty ()); } TEST (buckets, trim_normal) { - nano::scheduler::buckets buckets{ 1 }; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null, 1 }; buckets.push (1000, block0 (), nano::Gxrb_ratio); buckets.push (1100, block2 (), nano::Gxrb_ratio); ASSERT_EQ (1, buckets.size ()); - ASSERT_EQ (block0 (), buckets.top ()); + ASSERT_EQ (block0 (), buckets.top ().first); } TEST (buckets, trim_reverse) { - nano::scheduler::buckets buckets{ 1 }; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null, 1 }; buckets.push (1100, block2 (), nano::Gxrb_ratio); buckets.push (1000, block0 (), nano::Gxrb_ratio); ASSERT_EQ (1, buckets.size ()); - ASSERT_EQ (block0 (), buckets.top ()); + ASSERT_EQ (block0 (), buckets.top ().first); } TEST (buckets, trim_even) { - nano::scheduler::buckets buckets{ 2 }; + nano::scheduler::buckets buckets{ nano::test::active_transactions_insert_null, 2 }; buckets.push (1000, block0 (), nano::Gxrb_ratio); buckets.push (1100, block2 (), nano::Gxrb_ratio); ASSERT_EQ (1, buckets.size ()); - ASSERT_EQ (block0 (), buckets.top ()); + ASSERT_EQ (block0 (), buckets.top ().first); buckets.push (1000, block1 (), nano::Mxrb_ratio); ASSERT_EQ (2, buckets.size ()); - ASSERT_EQ (block0 (), buckets.top ()); + ASSERT_EQ (block0 (), buckets.top ().first); buckets.pop (); - ASSERT_EQ (block1 (), buckets.top ()); + ASSERT_EQ (block1 (), buckets.top ().first); } diff --git a/nano/core_test/scheduler_limiter.cpp b/nano/core_test/scheduler_limiter.cpp new file mode 100644 index 0000000000..4b81f0dbcd --- /dev/null +++ b/nano/core_test/scheduler_limiter.cpp @@ -0,0 +1,37 @@ +#include +#include +#include +#include + +#include + +TEST (scheduler_limiter, construction) +{ + auto occupancy = std::make_shared (nano::test::active_transactions_insert_null, 1, nano::election_behavior::normal); + ASSERT_EQ (1, occupancy->limit ()); + ASSERT_TRUE (occupancy->available ()); +} + +TEST (scheduler_limiter, limit) +{ + auto occupancy = std::make_shared (nano::test::active_transactions_insert_null, 1, nano::election_behavior::normal); + ASSERT_EQ (1, occupancy->limit ()); + ASSERT_TRUE (occupancy->available ()); +} + +TEST (scheduler_limiter, election_activate_observer) +{ + nano::test::system system{ 1 }; + auto occupancy = std::make_shared ([&] (auto const & block, auto const & behavior) { + return system.node (0).active.insert (block, behavior); + }, + 1, nano::election_behavior::normal); + auto result = occupancy->activate (nano::dev::genesis); + ASSERT_TRUE (result.inserted); + auto elections = occupancy->elections (); + ASSERT_EQ (1, elections.size ()); + ASSERT_EQ (1, elections.count (nano::dev::genesis->qualified_root ())); + ASSERT_FALSE (occupancy->available ()); + result.election = nullptr; // Implicitly run election destructor notification by clearing the last reference + ASSERT_TIMELY (5s, occupancy->available ()); +} diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 4e1f29cc02..831d6419dd 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -202,6 +202,8 @@ add_library( scheduler/component.cpp scheduler/hinted.hpp scheduler/hinted.cpp + scheduler/limiter.hpp + scheduler/limiter.cpp scheduler/manual.hpp scheduler/manual.cpp scheduler/optimistic.hpp diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index ca00132a9c..984521b979 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -373,6 +373,14 @@ nano::election_insertion_result nano::active_transactions::insert (const std::sh return result; } +std::function const & block, nano::election_behavior behavior)> nano::active_transactions::insert_fn () +{ + return [this] (std::shared_ptr const & block, nano::election_behavior behavior) { + auto result = insert (block, behavior); + return result; + }; +} + void nano::active_transactions::trim () { /* diff --git a/nano/node/active_transactions.hpp b/nano/node/active_transactions.hpp index 865da63be0..2cd3873e4c 100644 --- a/nano/node/active_transactions.hpp +++ b/nano/node/active_transactions.hpp @@ -142,6 +142,8 @@ class active_transactions final * Starts new election with a specified behavior type */ nano::election_insertion_result insert (std::shared_ptr const & block, nano::election_behavior behavior = nano::election_behavior::normal); + // Function wrapper around call to ::insert + std::function const & block, nano::election_behavior behavior)> insert_fn (); // Distinguishes replay votes, cannot be determined if the block is not in any election nano::vote_code vote (std::shared_ptr const &); // Is the root of this block in the roots container diff --git a/nano/node/election.cpp b/nano/node/election.cpp index 79243bfece..c8f3d5ee73 100644 --- a/nano/node/election.cpp +++ b/nano/node/election.cpp @@ -32,6 +32,11 @@ nano::election::election (nano::node & node_a, std::shared_ptr cons last_blocks.emplace (block_a->hash (), block_a); } +nano::election::~election () +{ + destructor_observers.notify (qualified_root); +} + void nano::election::confirm_once (nano::unique_lock & lock_a, nano::election_status_type type_a) { debug_assert (lock_a.owns_lock ()); diff --git a/nano/node/election.hpp b/nano/node/election.hpp index 1acc6eb921..59abffd06b 100644 --- a/nano/node/election.hpp +++ b/nano/node/election.hpp @@ -138,7 +138,10 @@ class election final : public std::enable_shared_from_this public: // Interface election (nano::node &, std::shared_ptr const & block, std::function const &)> const & confirmation_action, std::function const & vote_action, nano::election_behavior behavior); + ~election (); + nano::observer_set destructor_observers; +public: std::shared_ptr find (nano::block_hash const &) const; /* * Process vote. Internally uses cooldown to throttle non-final votes diff --git a/nano/node/scheduler/bucket.cpp b/nano/node/scheduler/bucket.cpp index 51ba1626a5..e77e906444 100644 --- a/nano/node/scheduler/bucket.cpp +++ b/nano/node/scheduler/bucket.cpp @@ -1,5 +1,6 @@ #include #include +#include bool nano::scheduler::bucket::value_type::operator< (value_type const & other_a) const { @@ -11,20 +12,23 @@ bool nano::scheduler::bucket::value_type::operator== (value_type const & other_a return time == other_a.time && block->hash () == other_a.block->hash (); } -nano::scheduler::bucket::bucket (size_t maximum) : - maximum{ maximum } +nano::scheduler::bucket::bucket (std::shared_ptr limiter, size_t maximum) : + maximum{ maximum }, + limiter{ limiter } { debug_assert (maximum > 0); + debug_assert (limiter != nullptr); } nano::scheduler::bucket::~bucket () { } -std::shared_ptr nano::scheduler::bucket::top () const +std::pair, std::shared_ptr> nano::scheduler::bucket::top () const { debug_assert (!queue.empty ()); - return queue.begin ()->block; + auto & first = *queue.begin (); + return { first.block, limiter }; } void nano::scheduler::bucket::pop () @@ -53,6 +57,11 @@ bool nano::scheduler::bucket::empty () const return queue.empty (); } +bool nano::scheduler::bucket::available () const +{ + return !queue.empty () && limiter->available (); +} + void nano::scheduler::bucket::dump () const { for (auto const & item : queue) diff --git a/nano/node/scheduler/bucket.hpp b/nano/node/scheduler/bucket.hpp index 2f32c17d59..f5e8c3a42c 100644 --- a/nano/node/scheduler/bucket.hpp +++ b/nano/node/scheduler/bucket.hpp @@ -11,6 +11,7 @@ class block; } namespace nano::scheduler { +class limiter; /** A class which holds an ordered set of blocks to be scheduled, ordered by their block arrival time */ class bucket final @@ -25,15 +26,17 @@ class bucket final }; std::set queue; size_t const maximum; + std::shared_ptr limiter; public: - bucket (size_t maximum); + bucket (std::shared_ptr limiter, size_t maximum); ~bucket (); - std::shared_ptr top () const; + std::pair, std::shared_ptr> top () const; void pop (); void push (uint64_t time, std::shared_ptr block); size_t size () const; bool empty () const; + bool available () const; void dump () const; }; } // namespace nano::scheduler diff --git a/nano/node/scheduler/buckets.cpp b/nano/node/scheduler/buckets.cpp index c033f6f6ca..164ee4c1ca 100644 --- a/nano/node/scheduler/buckets.cpp +++ b/nano/node/scheduler/buckets.cpp @@ -1,7 +1,9 @@ #include #include +#include #include #include +#include #include @@ -19,7 +21,7 @@ void nano::scheduler::buckets::next () void nano::scheduler::buckets::seek () { next (); - for (std::size_t i = 0, n = buckets_m.size (); (*current)->empty () && i < n; ++i) + for (std::size_t i = 0, n = buckets_m.size (); !(*current)->available () && i < n; ++i) { next (); } @@ -29,7 +31,7 @@ void nano::scheduler::buckets::seek () * Prioritization constructor, construct a container containing approximately 'maximum' number of blocks. * @param maximum number of blocks that this container can hold, this is a soft and approximate limit. */ -nano::scheduler::buckets::buckets (uint64_t maximum) : +nano::scheduler::buckets::buckets (insert_t const & insert, uint64_t maximum) : maximum{ maximum } { auto build_region = [this] (uint128_t const & begin, uint128_t const & end, size_t count) { @@ -52,7 +54,8 @@ nano::scheduler::buckets::buckets (uint64_t maximum) : auto bucket_max = std::max (1u, maximum / minimums.size ()); for (size_t i = 0u, n = minimums.size (); i < n; ++i) { - buckets_m.push_back (std::make_unique (bucket_max)); + auto limiter = std::make_shared (insert, bucket_max, nano::election_behavior::normal); + buckets_m.push_back (std::make_unique (limiter, bucket_max)); } current = buckets_m.begin (); } @@ -73,19 +76,19 @@ std::size_t nano::scheduler::buckets::index (nano::uint128_t const & balance) co */ void nano::scheduler::buckets::push (uint64_t time, std::shared_ptr block, nano::amount const & priority) { - auto was_empty = empty (); + auto was_available = available (); auto & bucket = buckets_m[index (priority.number ())]; bucket->push (time, block); - if (was_empty) + if (!was_available) { seek (); } } /** Return the highest priority block of the current bucket */ -std::shared_ptr nano::scheduler::buckets::top () const +std::pair, std::shared_ptr> nano::scheduler::buckets::top () const { - debug_assert (!empty ()); + debug_assert (available ()); auto result = (*current)->top (); return result; } @@ -93,7 +96,7 @@ std::shared_ptr nano::scheduler::buckets::top () const /** Pop the current block from the container and seek to the next block, if it exists */ void nano::scheduler::buckets::pop () { - debug_assert (!empty ()); + debug_assert (available ()); auto & bucket = *current; bucket->pop (); seek (); @@ -128,6 +131,11 @@ bool nano::scheduler::buckets::empty () const return std::all_of (buckets_m.begin (), buckets_m.end (), [] (auto const & bucket) { return bucket->empty (); }); } +bool nano::scheduler::buckets::available () const +{ + return std::any_of (buckets_m.begin (), buckets_m.end (), [] (auto const & bucket) { return bucket->available (); }); +} + /** Print the state of the class in stderr */ void nano::scheduler::buckets::dump () const { diff --git a/nano/node/scheduler/buckets.hpp b/nano/node/scheduler/buckets.hpp index 967b4408f7..ee49ec622f 100644 --- a/nano/node/scheduler/buckets.hpp +++ b/nano/node/scheduler/buckets.hpp @@ -1,6 +1,8 @@ #pragma once #include #include +#include +#include #include #include @@ -9,11 +11,13 @@ namespace nano { +class active_transactions; class block; } namespace nano::scheduler { class bucket; +class limiter; /** A container for holding blocks and their arrival/creation time. * * The container consists of a number of buckets. Each bucket holds an ordered set of 'value_type' items. @@ -40,18 +44,21 @@ class buckets final uint64_t const maximum; void next (); - void seek (); public: - buckets (uint64_t maximum = 250000u); + using insert_t = std::function const & block, nano::election_behavior behavior)>; + + buckets (insert_t const & insert, uint64_t maximum = 250000u); ~buckets (); void push (uint64_t time, std::shared_ptr block, nano::amount const & priority); - std::shared_ptr top () const; + std::pair, std::shared_ptr> top () const; void pop (); + void seek (); std::size_t size () const; std::size_t bucket_count () const; std::size_t bucket_size (std::size_t index) const; bool empty () const; + bool available () const; void dump () const; std::size_t index (nano::uint128_t const & balance) const; diff --git a/nano/node/scheduler/hinted.cpp b/nano/node/scheduler/hinted.cpp index 6c5d4933bd..92222b6ef2 100644 --- a/nano/node/scheduler/hinted.cpp +++ b/nano/node/scheduler/hinted.cpp @@ -1,6 +1,7 @@ #include #include #include +#include nano::scheduler::hinted::config::config (nano::node_config const & config) : vote_cache_check_interval_ms{ config.network_params.network.is_dev_network () ? 100u : 1000u } @@ -11,9 +12,8 @@ nano::scheduler::hinted::hinted (config const & config_a, nano::node & node_a, n config_m{ config_a }, node{ node_a }, inactive_vote_cache{ inactive_vote_cache_a }, - active{ active_a }, - online_reps{ online_reps_a }, - stats{ stats_a } + limiter{ std::make_shared (node.active.insert_fn (), std::max (node.config.active_elections_hinted_limit_percentage * node.config.active_elections_size / 100, 1u), nano::election_behavior::hinted) }, + online_reps{ online_reps_a }, stats{ stats_a } { } @@ -51,7 +51,7 @@ void nano::scheduler::hinted::notify () bool nano::scheduler::hinted::predicate (nano::uint128_t const & minimum_tally) const { // Check if there is space inside AEC for a new hinted election - if (active.vacancy (nano::election_behavior::hinted) > 0) + if (limiter->available ()) { // Check if there is any vote cache entry surpassing our minimum vote tally threshold if (inactive_vote_cache.peek (minimum_tally)) @@ -59,6 +59,10 @@ bool nano::scheduler::hinted::predicate (nano::uint128_t const & minimum_tally) return true; } } + else + { + std::cerr << '\0'; + } return false; } @@ -77,9 +81,9 @@ bool nano::scheduler::hinted::run_one (nano::uint128_t const & minimum_tally) { // Try to insert it into AEC as hinted election // We check for AEC vacancy inside our predicate - auto result = node.active.insert (block, nano::election_behavior::hinted); + auto result = limiter->activate (block); - stats.inc (nano::stat::type::hinting, result.inserted ? nano::stat::detail::insert : nano::stat::detail::insert_failed); + stats.inc (nano::stat::type::hinting, result.inserted ? nano::stat::detail::hinted : nano::stat::detail::insert_failed); return result.inserted; // Return whether block was inserted } diff --git a/nano/node/scheduler/hinted.hpp b/nano/node/scheduler/hinted.hpp index ba344095d2..1d974ad2d3 100644 --- a/nano/node/scheduler/hinted.hpp +++ b/nano/node/scheduler/hinted.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -17,6 +18,7 @@ class online_reps; } namespace nano::scheduler { +class limiter; /* * Monitors inactive vote cache and schedules elections with the highest observed vote tally. */ @@ -53,7 +55,7 @@ class hinted final private: // Dependencies nano::node & node; nano::vote_cache & inactive_vote_cache; - nano::active_transactions & active; + std::shared_ptr limiter; nano::online_reps & online_reps; nano::stats & stats; diff --git a/nano/node/scheduler/limiter.cpp b/nano/node/scheduler/limiter.cpp new file mode 100644 index 0000000000..4cde8e61fe --- /dev/null +++ b/nano/node/scheduler/limiter.cpp @@ -0,0 +1,63 @@ +#include +#include +#include +#include + +#include + +nano::scheduler::limiter::limiter (insert_t const & insert, size_t limit, nano::election_behavior behavior) : + insert{ insert }, + limit_m{ limit }, + behavior{ behavior } +{ + debug_assert (limit > 0); +} + +size_t nano::scheduler::limiter::limit () const +{ + return limit_m; +} + +std::unordered_set nano::scheduler::limiter::elections () const +{ + nano::lock_guard lock{ mutex }; + return elections_m; +} + +bool nano::scheduler::limiter::available () const +{ + nano::lock_guard lock{ mutex }; + auto result = elections_m.size () < limit (); + return result; +} + +nano::election_insertion_result nano::scheduler::limiter::activate (std::shared_ptr const & block) +{ + if (!available ()) + { + return { nullptr, false }; + } + + // This code section is not synchronous with respect to available () + // It is assumed 'sink' is thread safe and only one call to + auto result = insert (block, behavior); + if (result.inserted) + { + nano::lock_guard lock{ mutex }; + elections_m.insert (result.election->qualified_root); + // Capture via weak_ptr so we don't have to consider destruction order of nano::scheduler::limiter compared to nano::election. + result.election->destructor_observers.add ([this_w = std::weak_ptr{ shared_from_this () }] (nano::qualified_root const & root) { + if (auto this_l = this_w.lock ()) + { + this_l->election_destruction_notification (root); + } + }); + } + return result; +} + +size_t nano::scheduler::limiter::election_destruction_notification (nano::qualified_root const & root) +{ + nano::lock_guard lock{ mutex }; + return elections_m.erase (root); +} diff --git a/nano/node/scheduler/limiter.hpp b/nano/node/scheduler/limiter.hpp new file mode 100644 index 0000000000..fa16259e01 --- /dev/null +++ b/nano/node/scheduler/limiter.hpp @@ -0,0 +1,49 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace nano +{ +class block; +class election; +enum class election_behavior; +class stats; +} +namespace nano::scheduler +{ +/** + This class is a facade around active_transactions that limits the number of elections that can be inserted. +*/ +class limiter : public std::enable_shared_from_this +{ +public: + using insert_t = std::function const & block, nano::election_behavior behavior)>; + + limiter (insert_t const & insert, size_t limit, nano::election_behavior behavior); + // Checks whether there is availability to insert an election for 'block' and if so, spawns a new election + nano::election_insertion_result activate (std::shared_ptr const & block); + // Returns whether there is availability to insert a new election + bool available () const; + // Returns the upper limit on the number of elections allowed to be started + size_t limit () const; + std::unordered_set elections () const; + +private: + size_t election_destruction_notification (nano::qualified_root const & root); + + insert_t insert; + size_t const limit_m; + nano::election_behavior behavior; + // Tracks the elections that have been started through this facade + std::unordered_set elections_m; + std::function block)> start_election; + + mutable nano::mutex mutex; +}; +} // namespace nano::scheduler diff --git a/nano/node/scheduler/optimistic.cpp b/nano/node/scheduler/optimistic.cpp index a00fa4d96c..5fd48c783d 100644 --- a/nano/node/scheduler/optimistic.cpp +++ b/nano/node/scheduler/optimistic.cpp @@ -1,13 +1,14 @@ #include #include #include +#include #include nano::scheduler::optimistic::optimistic (optimistic_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::active_transactions & active_a, nano::network_constants const & network_constants_a, nano::stats & stats_a) : config{ config_a }, node{ node_a }, ledger{ ledger_a }, - active{ active_a }, + limiter{ std::make_shared (node.active.insert_fn (), std::max (node.config.active_elections_optimistic_limit_percentage * node.config.active_elections_size / 100, 1u), nano::election_behavior::optimistic) }, network_constants{ network_constants_a }, stats{ stats_a } { @@ -100,7 +101,7 @@ bool nano::scheduler::optimistic::predicate () const { debug_assert (!mutex.try_lock ()); - if (active.vacancy (nano::election_behavior::optimistic) <= 0) + if (!limiter->available ()) { return false; } @@ -154,7 +155,7 @@ void nano::scheduler::optimistic::run_one (nano::transaction const & transaction { // Try to insert it into AEC // We check for AEC vacancy inside our predicate - auto result = node.active.insert (block, nano::election_behavior::optimistic); + auto result = limiter->activate (block); stats.inc (nano::stat::type::optimistic_scheduler, result.inserted ? nano::stat::detail::insert : nano::stat::detail::insert_failed); } diff --git a/nano/node/scheduler/optimistic.hpp b/nano/node/scheduler/optimistic.hpp index 3c16c72059..283e921a6a 100644 --- a/nano/node/scheduler/optimistic.hpp +++ b/nano/node/scheduler/optimistic.hpp @@ -22,13 +22,14 @@ namespace mi = boost::multi_index; namespace nano { -class node; -class ledger; class active_transactions; +class ledger; +class node; } namespace nano::scheduler { +class limiter; class optimistic_config final { public: @@ -76,7 +77,7 @@ class optimistic final optimistic_config const & config; nano::node & node; nano::ledger & ledger; - nano::active_transactions & active; + std::shared_ptr limiter; nano::network_constants const & network_constants; nano::stats & stats; diff --git a/nano/node/scheduler/priority.cpp b/nano/node/scheduler/priority.cpp index e4bd01392e..6cc21d8ec4 100644 --- a/nano/node/scheduler/priority.cpp +++ b/nano/node/scheduler/priority.cpp @@ -1,11 +1,12 @@ #include #include +#include #include nano::scheduler::priority::priority (nano::node & node_a, nano::stats & stats_a) : node{ node_a }, stats{ stats_a }, - buckets{ std::make_unique () } + buckets{ std::make_unique (node_a.active.insert_fn ()) } { } @@ -88,7 +89,7 @@ bool nano::scheduler::priority::empty () const bool nano::scheduler::priority::predicate () const { - return node.active.vacancy () > 0 && !buckets->empty (); + return node.active.vacancy () > 0 && buckets->available (); } void nano::scheduler::priority::run () @@ -106,11 +107,12 @@ void nano::scheduler::priority::run () if (predicate ()) { - auto block = buckets->top (); + auto [block, limiter] = buckets->top (); + debug_assert (limiter->available ()); buckets->pop (); lock.unlock (); stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority); - auto result = node.active.insert (block); + auto result = limiter->activate (block); if (result.inserted) { stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority_success); diff --git a/nano/test_common/testutil.cpp b/nano/test_common/testutil.cpp index 05a6db689b..64ef5d956e 100644 --- a/nano/test_common/testutil.cpp +++ b/nano/test_common/testutil.cpp @@ -244,3 +244,5 @@ bool nano::test::start_elections (nano::test::system & system_a, nano::node & no { return nano::test::start_elections (system_a, node_a, blocks_to_hashes (blocks_a), forced_a); } + +std::function const & block, nano::election_behavior behavior)> nano::test::active_transactions_insert_null = [] (std::shared_ptr const & block, nano::election_behavior behavior) { return nano::election_insertion_result{}; }; diff --git a/nano/test_common/testutil.hpp b/nano/test_common/testutil.hpp index 6a5ab1e379..d7a8ab25c8 100644 --- a/nano/test_common/testutil.hpp +++ b/nano/test_common/testutil.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -131,6 +132,7 @@ class network_params; class vote; class block; class election; +enum class election_behavior; extern nano::uint128_t const & genesis_amount; @@ -418,5 +420,6 @@ namespace test * NOTE: Each election is given 5 seconds to complete, if it does not complete in 5 seconds, it will return an error. */ [[nodiscard]] bool start_elections (nano::test::system &, nano::node &, std::vector> const &, bool const forced_a = false); + extern std::function const & block, nano::election_behavior behavior)> active_transactions_insert_null; } }