diff --git a/nano/core_test/active_elections.cpp b/nano/core_test/active_elections.cpp index 29cd87f70e..20cc748e66 100644 --- a/nano/core_test/active_elections.cpp +++ b/nano/core_test/active_elections.cpp @@ -163,7 +163,7 @@ TEST (active_elections, keep_local) nano::node_config node_config = system.default_config (); node_config.enable_voting = false; // Bound to 2, won't drop wallet created transactions, but good to test dropping remote - node_config.active_elections.size = 2; + node_config.priority_scheduler.bucket_maximum = 2; // Disable frontier confirmation to allow the test to finish before node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; @@ -192,7 +192,10 @@ TEST (active_elections, keep_local) ASSERT_NE (nullptr, send6); // force-confirm blocks - nano::test::confirm (node.ledger, send6); + auto election = nano::test::start_election (system, node, send6->hash ()); + ASSERT_NE (nullptr, election); + election->force_confirm (); + ASSERT_TIMELY (5s, node.active.empty ()); nano::state_block_builder builder{}; const auto receive1 = builder.make_block () @@ -227,8 +230,7 @@ TEST (active_elections, keep_local) node.process_active (receive3); /// bound elections, should drop after one loop - ASSERT_TIMELY_EQ (5s, node.active.size (), node_config.active_elections.size); - // ASSERT_EQ (1, node.scheduler.size ()); + ASSERT_TIMELY_EQ (5s, node.active.size (), node_config.priority_scheduler.bucket_maximum); } TEST (inactive_votes_cache, basic) @@ -1389,101 +1391,3 @@ TEST (active_elections, limit_vote_hinted_elections) // Ensure there was no overflow of elections ASSERT_EQ (0, node.stats.count (nano::stat::type::active_dropped, nano::stat::detail::priority)); } - -/* - * Tests that when AEC is running at capacity from normal elections, it is still possible to schedule a limited number of hinted elections - */ -TEST (active_elections, allow_limited_overflow) -{ - nano::test::system system; - nano::node_config config = system.default_config (); - const int aec_limit = 20; - config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; - config.active_elections.size = aec_limit; - config.active_elections.hinted_limit_percentage = 20; // Should give us a limit of 4 hinted elections - auto & node = *system.add_node (config); - - auto blocks = nano::test::setup_independent_blocks (system, node, aec_limit * 4); - - // Split blocks in two halves - std::vector> blocks1 (blocks.begin (), blocks.begin () + blocks.size () / 2); - std::vector> blocks2 (blocks.begin () + blocks.size () / 2, blocks.end ()); - - // Even though automatic frontier confirmation is disabled, AEC is doing funny stuff and inserting elections, clear that - WAIT (1s); - node.active.clear (); - ASSERT_TRUE (node.active.empty ()); - - // Insert the first part of the blocks into normal election scheduler - for (auto const & block : blocks1) - { - node.scheduler.priority.activate (node.ledger.tx_begin_read (), block->account ()); - } - - // Ensure number of active elections reaches AEC limit and there is no overfill - ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit (nano::election_behavior::priority)); - // And it stays that way without increasing - ASSERT_ALWAYS (1s, node.active.size () == node.active.limit (nano::election_behavior::priority)); - - // Insert votes for the second part of the blocks, so that those are scheduled as hinted elections - for (auto const & block : blocks2) - { - // Non-final vote, so it stays in the AEC without getting confirmed - auto vote = nano::test::make_vote (nano::dev::genesis_key, { block }); - node.vote_cache.insert (vote); - } - - // Ensure active elections overfill AEC only up to normal + hinted limit - ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit (nano::election_behavior::priority) + node.active.limit (nano::election_behavior::hinted)); - // And it stays that way without increasing - ASSERT_ALWAYS (1s, node.active.size () == node.active.limit (nano::election_behavior::priority) + node.active.limit (nano::election_behavior::hinted)); -} - -/* - * Tests that when hinted elections are present in the AEC, normal scheduler adapts not to exceed the limit of all elections - */ -TEST (active_elections, allow_limited_overflow_adapt) -{ - nano::test::system system; - nano::node_config config = system.default_config (); - const int aec_limit = 20; - config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; - config.active_elections.size = aec_limit; - config.active_elections.hinted_limit_percentage = 20; // Should give us a limit of 4 hinted elections - auto & node = *system.add_node (config); - - auto blocks = nano::test::setup_independent_blocks (system, node, aec_limit * 4); - - // Split blocks in two halves - std::vector> blocks1 (blocks.begin (), blocks.begin () + blocks.size () / 2); - std::vector> blocks2 (blocks.begin () + blocks.size () / 2, blocks.end ()); - - // Even though automatic frontier confirmation is disabled, AEC is doing funny stuff and inserting elections, clear that - WAIT (1s); - node.active.clear (); - ASSERT_TRUE (node.active.empty ()); - - // Insert votes for the second part of the blocks, so that those are scheduled as hinted elections - for (auto const & block : blocks2) - { - // Non-final vote, so it stays in the AEC without getting confirmed - auto vote = nano::test::make_vote (nano::dev::genesis_key, { block }); - node.vote_cache.insert (vote); - } - - // Ensure hinted election amount is bounded by hinted limit - ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit (nano::election_behavior::hinted)); - // And it stays that way without increasing - ASSERT_ALWAYS (1s, node.active.size () == node.active.limit (nano::election_behavior::hinted)); - - // Insert the first part of the blocks into normal election scheduler - for (auto const & block : blocks1) - { - node.scheduler.priority.activate (node.ledger.tx_begin_read (), block->account ()); - } - - // Ensure number of active elections reaches AEC limit and there is no overfill - ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit (nano::election_behavior::priority)); - // And it stays that way without increasing - ASSERT_ALWAYS (1s, node.active.size () == node.active.limit (nano::election_behavior::priority)); -} diff --git a/nano/core_test/election_scheduler.cpp b/nano/core_test/election_scheduler.cpp index 02a37bec89..8c525e9e1a 100644 --- a/nano/core_test/election_scheduler.cpp +++ b/nano/core_test/election_scheduler.cpp @@ -53,96 +53,3 @@ TEST (election_scheduler, activate_one_flush) system.nodes[0]->scheduler.priority.activate (system.nodes[0]->ledger.tx_begin_read (), nano::dev::genesis_key.pub); ASSERT_TIMELY (5s, system.nodes[0]->active.election (send1->qualified_root ())); } - -/** - * Tests that the election scheduler and the active transactions container (AEC) - * work in sync with regards to the node configuration value "active_elections.size". - * - * The test sets up two forcefully cemented blocks -- a send on the genesis account and a receive on a second account. - * It then creates two other blocks, each a successor to one of the previous two, - * and processes them locally (without the node starting elections for them, but just saving them to disk). - * - * Elections for these latter two (B1 and B2) are started by the test code manually via `election_scheduler::activate`. - * The test expects E1 to start right off and take its seat into the AEC. - * E2 is expected not to start though (because the AEC is full), so B2 should be awaiting in the scheduler's queue. - * - * As soon as the test code manually confirms E1 (and thus evicts it out of the AEC), - * it is expected that E2 begins and the scheduler's queue becomes empty again. - */ -TEST (election_scheduler, no_vacancy) -{ - nano::test::system system{}; - - nano::node_config config = system.default_config (); - config.active_elections.size = 1; - config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; - - auto & node = *system.add_node (config); - nano::state_block_builder builder{}; - nano::keypair key{}; - - // Activating accounts depends on confirmed dependencies. First, prepare 2 accounts - auto send = builder.make_block () - .account (nano::dev::genesis_key.pub) - .previous (nano::dev::genesis->hash ()) - .representative (nano::dev::genesis_key.pub) - .link (key.pub) - .balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio) - .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) - .work (*system.work.generate (nano::dev::genesis->hash ())) - .build (); - ASSERT_EQ (nano::block_status::progress, node.process (send)); - node.process_confirmed (nano::election_status{ send }); - - auto receive = builder.make_block () - .account (key.pub) - .previous (0) - .representative (key.pub) - .link (send->hash ()) - .balance (nano::Gxrb_ratio) - .sign (key.prv, key.pub) - .work (*system.work.generate (key.pub)) - .build (); - ASSERT_EQ (nano::block_status::progress, node.process (receive)); - node.process_confirmed (nano::election_status{ receive }); - - ASSERT_TIMELY (5s, nano::test::confirmed (node, { send, receive })); - - // Second, process two eligible transactions - auto block1 = builder.make_block () - .account (nano::dev::genesis_key.pub) - .previous (send->hash ()) - .representative (nano::dev::genesis_key.pub) - .link (nano::dev::genesis_key.pub) - .balance (nano::dev::constants.genesis_amount - 2 * nano::Gxrb_ratio) - .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) - .work (*system.work.generate (send->hash ())) - .build (); - ASSERT_EQ (nano::block_status::progress, node.process (block1)); - - // There is vacancy so it should be inserted - node.scheduler.priority.activate (node.ledger.tx_begin_read (), nano::dev::genesis_key.pub); - std::shared_ptr election{}; - ASSERT_TIMELY (5s, (election = node.active.election (block1->qualified_root ())) != nullptr); - - auto block2 = builder.make_block () - .account (key.pub) - .previous (receive->hash ()) - .representative (key.pub) - .link (key.pub) - .balance (0) - .sign (key.prv, key.pub) - .work (*system.work.generate (receive->hash ())) - .build (); - ASSERT_EQ (nano::block_status::progress, node.process (block2)); - - // There is no vacancy so it should stay queued - node.scheduler.priority.activate (node.ledger.tx_begin_read (), key.pub); - ASSERT_TIMELY_EQ (5s, node.scheduler.priority.size (), 1); - ASSERT_EQ (node.active.election (block2->qualified_root ()), nullptr); - - // Election confirmed, next in queue should begin - election->force_confirm (); - ASSERT_TIMELY (5s, node.active.election (block2->qualified_root ()) != nullptr); - ASSERT_TRUE (node.scheduler.priority.empty ()); -} diff --git a/nano/core_test/scheduler_buckets.cpp b/nano/core_test/scheduler_buckets.cpp index 5cfaf57bf4..7fb94ab5df 100644 --- a/nano/core_test/scheduler_buckets.cpp +++ b/nano/core_test/scheduler_buckets.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -118,7 +119,7 @@ TEST (buckets, construction) TEST (buckets, insert_Gxrb) { nano::scheduler::buckets buckets; - buckets.push (1000, block0 (), nano::Gxrb_ratio); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); ASSERT_EQ (1, buckets.size ()); ASSERT_EQ (1, buckets.bucket_size (nano::Gxrb_ratio)); } @@ -126,7 +127,7 @@ TEST (buckets, insert_Gxrb) TEST (buckets, insert_Mxrb) { nano::scheduler::buckets buckets; - buckets.push (1000, block1 (), nano::Mxrb_ratio); + buckets.bucket (nano::Mxrb_ratio).push (1000, block1 ()); ASSERT_EQ (1, buckets.size ()); ASSERT_EQ (1, buckets.bucket_size (nano::Mxrb_ratio)); } @@ -135,8 +136,8 @@ TEST (buckets, insert_Mxrb) TEST (buckets, insert_same_priority) { nano::scheduler::buckets buckets; - buckets.push (1000, block0 (), nano::Gxrb_ratio); - buckets.push (1000, block2 (), nano::Gxrb_ratio); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + buckets.bucket (nano::Gxrb_ratio).push (1000, block2 ()); ASSERT_EQ (2, buckets.size ()); ASSERT_EQ (2, buckets.bucket_size (nano::Gxrb_ratio)); } @@ -145,8 +146,8 @@ TEST (buckets, insert_same_priority) TEST (buckets, insert_duplicate) { nano::scheduler::buckets buckets; - buckets.push (1000, block0 (), nano::Gxrb_ratio); - buckets.push (1000, block0 (), nano::Gxrb_ratio); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); ASSERT_EQ (1, buckets.size ()); ASSERT_EQ (1, buckets.bucket_size (nano::Gxrb_ratio)); } @@ -154,89 +155,89 @@ TEST (buckets, insert_duplicate) TEST (buckets, insert_older) { nano::scheduler::buckets buckets; - buckets.push (1000, block0 (), nano::Gxrb_ratio); - buckets.push (1100, block2 (), nano::Gxrb_ratio); - ASSERT_EQ (block0 (), buckets.top ()); - buckets.pop (); - ASSERT_EQ (block2 (), buckets.top ()); - buckets.pop (); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + buckets.bucket (nano::Gxrb_ratio).push (1100, block2 ()); + ASSERT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); + buckets.bucket (nano::Gxrb_ratio).pop (); + ASSERT_EQ (block2 (), buckets.bucket (nano::Gxrb_ratio).top ()); + buckets.bucket (nano::Gxrb_ratio).pop (); } TEST (buckets, pop) { nano::scheduler::buckets buckets; ASSERT_TRUE (buckets.empty ()); - buckets.push (1000, block0 (), nano::Gxrb_ratio); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); ASSERT_FALSE (buckets.empty ()); - buckets.pop (); + buckets.bucket (nano::Gxrb_ratio).pop (); ASSERT_TRUE (buckets.empty ()); } TEST (buckets, top_one) { nano::scheduler::buckets buckets; - buckets.push (1000, block0 (), nano::Gxrb_ratio); - ASSERT_EQ (block0 (), buckets.top ()); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + ASSERT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); } TEST (buckets, top_two) { nano::scheduler::buckets buckets; - buckets.push (1000, block0 (), nano::Gxrb_ratio); - buckets.push (1, block1 (), nano::Mxrb_ratio); - ASSERT_EQ (block0 (), buckets.top ()); - buckets.pop (); - ASSERT_EQ (block1 (), buckets.top ()); - buckets.pop (); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + buckets.bucket (nano::Mxrb_ratio).push (1, block1 ()); + ASSERT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); + buckets.bucket (nano::Gxrb_ratio).pop (); + ASSERT_EQ (block1 (), buckets.bucket (nano::Mxrb_ratio).top ()); + buckets.bucket (nano::Mxrb_ratio).pop (); ASSERT_TRUE (buckets.empty ()); } TEST (buckets, top_round_robin) { nano::scheduler::buckets buckets; - buckets.push (1000, blockzero (), 0); - ASSERT_EQ (blockzero (), buckets.top ()); - buckets.push (1000, block0 (), nano::Gxrb_ratio); - buckets.push (1000, block1 (), nano::Mxrb_ratio); - buckets.push (1100, block3 (), nano::Mxrb_ratio); - buckets.pop (); // blockzero - EXPECT_EQ (block1 (), buckets.top ()); - buckets.pop (); - EXPECT_EQ (block0 (), buckets.top ()); - buckets.pop (); - EXPECT_EQ (block3 (), buckets.top ()); - buckets.pop (); + buckets.bucket (0).push (1000, blockzero ()); + ASSERT_EQ (blockzero (), buckets.bucket (0).top ()); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + buckets.bucket (nano::Mxrb_ratio).push (1000, block1 ()); + buckets.bucket (nano::Mxrb_ratio).push (1100, block3 ()); + buckets.bucket (0).pop (); // blockzero + EXPECT_EQ (block1 (), buckets.bucket (nano::Mxrb_ratio).top ()); + buckets.bucket (nano::Mxrb_ratio).pop (); + EXPECT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); + buckets.bucket (nano::Gxrb_ratio).pop (); + EXPECT_EQ (block3 (), buckets.bucket (nano::Mxrb_ratio).top ()); + buckets.bucket (nano::Mxrb_ratio).pop (); EXPECT_TRUE (buckets.empty ()); } TEST (buckets, trim_normal) { nano::scheduler::buckets buckets{ 1 }; - buckets.push (1000, block0 (), nano::Gxrb_ratio); - buckets.push (1100, block2 (), nano::Gxrb_ratio); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + buckets.bucket (nano::Gxrb_ratio).push (1100, block2 ()); ASSERT_EQ (1, buckets.size ()); - ASSERT_EQ (block0 (), buckets.top ()); + ASSERT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); } TEST (buckets, trim_reverse) { nano::scheduler::buckets buckets{ 1 }; - buckets.push (1100, block2 (), nano::Gxrb_ratio); - buckets.push (1000, block0 (), nano::Gxrb_ratio); + buckets.bucket (nano::Gxrb_ratio).push (1100, block2 ()); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); ASSERT_EQ (1, buckets.size ()); - ASSERT_EQ (block0 (), buckets.top ()); + ASSERT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); } TEST (buckets, trim_even) { nano::scheduler::buckets buckets{ 1 }; - buckets.push (1000, block0 (), nano::Gxrb_ratio); - buckets.push (1100, block2 (), nano::Gxrb_ratio); + buckets.bucket (nano::Gxrb_ratio).push (1000, block0 ()); + buckets.bucket (nano::Gxrb_ratio).push (1100, block2 ()); ASSERT_EQ (1, buckets.size ()); - ASSERT_EQ (block0 (), buckets.top ()); - buckets.push (1000, block1 (), nano::Mxrb_ratio); + ASSERT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); + buckets.bucket (nano::Mxrb_ratio).push (1000, block1 ()); ASSERT_EQ (2, buckets.size ()); - ASSERT_EQ (block0 (), buckets.top ()); - buckets.pop (); - ASSERT_EQ (block1 (), buckets.top ()); + ASSERT_EQ (block0 (), buckets.bucket (nano::Gxrb_ratio).top ()); + buckets.bucket (nano::Gxrb_ratio).pop (); + ASSERT_EQ (block1 (), buckets.bucket (nano::Mxrb_ratio).top ()); } diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 810514a14b..4d7f724b24 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -103,9 +103,6 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::scheduler_optimistic: thread_role_name_string = "Sched Opt"; break; - case nano::thread_role::name::scheduler_priority: - thread_role_name_string = "Sched Priority"; - break; case nano::thread_role::name::stats: thread_role_name_string = "Stats"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 1b8d50b639..6b9a00c17b 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -41,7 +41,6 @@ enum class name scheduler_hinted, scheduler_manual, scheduler_optimistic, - scheduler_priority, rep_crawler, local_block_broadcasting, rep_tiers, diff --git a/nano/node/active_elections.cpp b/nano/node/active_elections.cpp index b0e3507306..fcd5052e5e 100644 --- a/nano/node/active_elections.cpp +++ b/nano/node/active_elections.cpp @@ -293,6 +293,7 @@ void nano::active_elections::cleanup_election (nano::unique_lock & node.stats.sample (nano::stat::sample::active_election_duration, { 0, 1000 * 60 * 10 /* 0-10 minutes range */ }, election->duration ().count ()); vacancy_update (); + election_stopped.notify (election); for (auto const & [hash, block] : blocks_l) { diff --git a/nano/node/active_elections.hpp b/nano/node/active_elections.hpp index 869912f5c8..56bc386cb8 100644 --- a/nano/node/active_elections.hpp +++ b/nano/node/active_elections.hpp @@ -133,6 +133,7 @@ class active_elections final */ int64_t vacancy (nano::election_behavior behavior) const; std::function vacancy_update{ [] () {} }; + nano::observer_set> election_stopped; std::size_t election_winner_details_size (); void add_election_winner_details (nano::block_hash const &, std::shared_ptr const &); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 58ec072dae..602f2a0199 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -258,9 +258,11 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy if (!init_error ()) { + active.election_stopped.add ([this] (std::shared_ptr election) { + scheduler.priority.election_stopped (election); + }); // Notify election schedulers when AEC frees election slot active.vacancy_update = [this] () { - scheduler.priority.notify (); scheduler.hinted.notify (); scheduler.optimistic.notify (); }; diff --git a/nano/node/scheduler/bucket.cpp b/nano/node/scheduler/bucket.cpp index 51ba1626a5..3cb61b10e7 100644 --- a/nano/node/scheduler/bucket.cpp +++ b/nano/node/scheduler/bucket.cpp @@ -35,10 +35,13 @@ void nano::scheduler::bucket::pop () void nano::scheduler::bucket::push (uint64_t time, std::shared_ptr block) { + // Place this item which can lie anywhere in the range ( queue.begin (), queue.end () ] queue.insert ({ time, block }); if (queue.size () > maximum) { + // Trim lowest priority transaction from block queue debug_assert (!queue.empty ()); + // Drop last item which is lowest priority queue.erase (--queue.end ()); } } diff --git a/nano/node/scheduler/bucket.hpp b/nano/node/scheduler/bucket.hpp index 2f32c17d59..7e20473c98 100644 --- a/nano/node/scheduler/bucket.hpp +++ b/nano/node/scheduler/bucket.hpp @@ -12,6 +12,9 @@ class block; namespace nano::scheduler { /** A class which holds an ordered set of blocks to be scheduled, ordered by their block arrival time + * Tracks two internal counters limited by 'maximum' + * 1) active - number of elections this bucket has been responsible for starting + * 2) queue - A std::set ordered from highest to lowest priority which drops the last, lowest priority item */ class bucket final { @@ -24,7 +27,6 @@ class bucket final bool operator== (value_type const & other_a) const; }; std::set queue; - size_t const maximum; public: bucket (size_t maximum); @@ -35,5 +37,8 @@ class bucket final size_t size () const; bool empty () const; void dump () const; + + size_t active{ 0 }; + size_t const maximum; }; } // namespace nano::scheduler diff --git a/nano/node/scheduler/buckets.cpp b/nano/node/scheduler/buckets.cpp index 4c238ac825..e7ed771b5f 100644 --- a/nano/node/scheduler/buckets.cpp +++ b/nano/node/scheduler/buckets.cpp @@ -5,26 +5,6 @@ #include -/** Moves the bucket pointer to the next bucket */ -void nano::scheduler::buckets::next () -{ - ++current; - if (current == buckets_m.end ()) - { - current = buckets_m.begin (); - } -} - -/** Seek to the next non-empty bucket, if one exists */ -void nano::scheduler::buckets::seek () -{ - next (); - for (std::size_t i = 0, n = buckets_m.size (); current->second->empty () && i < n; ++i) - { - next (); - } -} - void nano::scheduler::buckets::setup_buckets (uint64_t maximum) { auto build_region = [&] (uint128_t const & begin, uint128_t const & end, size_t count) { @@ -53,7 +33,6 @@ void nano::scheduler::buckets::setup_buckets (uint64_t maximum) nano::scheduler::buckets::buckets (uint64_t maximum) { setup_buckets (maximum); - current = buckets_m.begin (); } nano::scheduler::buckets::~buckets () @@ -68,36 +47,6 @@ auto nano::scheduler::buckets::bucket (nano::uint128_t const & balance) const -> return *iter->second; } -/** - * Push a block and its associated time into the prioritization container. - * The time is given here because sideband might not exist in the case of state blocks. - */ -void nano::scheduler::buckets::push (uint64_t time, std::shared_ptr block, nano::amount const & priority) -{ - auto was_empty = empty (); - bucket (priority.number ()).push (time, block); - if (was_empty) - { - seek (); - } -} - -/** Return the highest priority block of the current bucket */ -std::shared_ptr nano::scheduler::buckets::top () const -{ - debug_assert (!empty ()); - auto result = current->second->top (); - return result; -} - -/** Pop the current block from the container and seek to the next block, if it exists */ -void nano::scheduler::buckets::pop () -{ - debug_assert (!empty ()); - current->second->pop (); - seek (); -} - /** Returns the total number of blocks in buckets */ std::size_t nano::scheduler::buckets::size () const { @@ -121,6 +70,11 @@ std::size_t nano::scheduler::buckets::bucket_size (nano::amount const & amount) return bucket (amount.number ()).size (); } +std::size_t nano::scheduler::buckets::active () const +{ + return std::accumulate (buckets_m.begin (), buckets_m.end (), 0, [] (auto const & total, auto const & item) { return total + item.second->active; }); +} + /** Returns true if all buckets are empty */ bool nano::scheduler::buckets::empty () const { diff --git a/nano/node/scheduler/buckets.hpp b/nano/node/scheduler/buckets.hpp index 385578cae7..93d2159348 100644 --- a/nano/node/scheduler/buckets.hpp +++ b/nano/node/scheduler/buckets.hpp @@ -29,22 +29,15 @@ class buckets final /** container for the buckets to be read in round robin fashion */ std::map> buckets_m; - /** index of bucket to read next */ - decltype (buckets_m)::const_iterator current; - - void next (); - void seek (); void setup_buckets (uint64_t maximum); public: buckets (uint64_t maximum = 4096); ~buckets (); - void push (uint64_t time, std::shared_ptr block, nano::amount const & priority); - std::shared_ptr top () const; - void pop (); std::size_t size () const; std::size_t bucket_count () const; std::size_t bucket_size (nano::amount const & amount) const; + std::size_t active () const; bool empty () const; void dump () const; scheduler::bucket & bucket (nano::uint128_t const & balance) const; diff --git a/nano/node/scheduler/component.cpp b/nano/node/scheduler/component.cpp index 78cf586449..b8d787bfa6 100644 --- a/nano/node/scheduler/component.cpp +++ b/nano/node/scheduler/component.cpp @@ -26,7 +26,6 @@ void nano::scheduler::component::start () hinted.start (); manual.start (); optimistic.start (); - priority.start (); } void nano::scheduler::component::stop () @@ -34,7 +33,6 @@ void nano::scheduler::component::stop () hinted.stop (); manual.stop (); optimistic.stop (); - priority.stop (); } std::unique_ptr nano::scheduler::component::collect_container_info (std::string const & name) diff --git a/nano/node/scheduler/priority.cpp b/nano/node/scheduler/priority.cpp index bb0314930d..c0e8b8b6e5 100644 --- a/nano/node/scheduler/priority.cpp +++ b/nano/node/scheduler/priority.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -21,33 +22,7 @@ nano::scheduler::priority::priority (priority_config const & config, nano::ledge nano::scheduler::priority::~priority () { - // Thread must be stopped before destruction - debug_assert (!thread.joinable ()); -} - -void nano::scheduler::priority::start () -{ - debug_assert (!thread.joinable ()); - - if (!config.enabled) - { - return; - } - - thread = std::thread{ [this] () { - nano::thread_role::set (nano::thread_role::name::scheduler_priority); - run (); - } }; -} - -void nano::scheduler::priority::stop () -{ - { - nano::lock_guard lock{ mutex }; - stopped = true; - } - notify (); - nano::join_or_pass (thread); + debug_assert (tracking.size () == buckets->active ()); } bool nano::scheduler::priority::activate (secure::transaction const & transaction, nano::account const & account) @@ -74,17 +49,19 @@ bool nano::scheduler::priority::activate (secure::transaction const & transactio nano::log::arg{ "priority", balance_priority }); nano::lock_guard lock{ mutex }; - buckets->push (time_priority, block, balance_priority); - notify (); + auto & bucket = buckets->bucket (balance_priority); + if (bucket.active < bucket.maximum) + { + activate (bucket, block); + } + else + { + bucket.push (time_priority, block); + } return true; // Activated } -void nano::scheduler::priority::notify () -{ - condition.notify_all (); -} - std::size_t nano::scheduler::priority::size () const { nano::lock_guard lock{ mutex }; @@ -102,47 +79,46 @@ bool nano::scheduler::priority::empty () const return empty_locked (); } -bool nano::scheduler::priority::predicate () const +void nano::scheduler::priority::activate (scheduler::bucket & bucket, std::shared_ptr block) { - return active.vacancy (nano::election_behavior::priority) > 0 && !buckets->empty (); + stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority); + auto result = active.insert (block); + if (result.inserted) + { + stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority_success); + debug_assert (result.election); + // If we inserted this election, increment the number active counter + ++bucket.active; + // Subscribe to election cleanup events to decrement the counter + tracking.emplace (result.election.get (), &bucket); + } + if (result.election != nullptr) + { + result.election->transition_active (); + } } -void nano::scheduler::priority::run () +void nano::scheduler::priority::election_stopped (std::shared_ptr election) { - nano::unique_lock lock{ mutex }; - while (!stopped) + nano::lock_guard lock{ mutex }; + if (auto existing = tracking.find (election.get ()); existing != tracking.end ()) { - condition.wait (lock, [this] () { - return stopped || predicate (); - }); - debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds - if (!stopped) + auto & bucket = *existing->second; + if (!bucket.empty ()) { - stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop); - - if (predicate ()) - { - auto block = buckets->top (); - buckets->pop (); - lock.unlock (); - stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority); - auto result = active.insert (block); - if (result.inserted) - { - stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority_success); - } - if (result.election != nullptr) - { - result.election->transition_active (); - } - } - else - { - lock.unlock (); - } - notify (); - lock.lock (); + // We have a block waiting for space in order to be scheduled + debug_assert (bucket.active == bucket.maximum); + auto top = bucket.top (); + bucket.pop (); + activate (bucket, top); + } + else + { + // Only decrement the active count if there wasn't something to replace it with. + --bucket.active; } + // Clean up election stop event subscription + tracking.erase (existing); } } diff --git a/nano/node/scheduler/priority.hpp b/nano/node/scheduler/priority.hpp index dafdc4e17a..7102c69b46 100644 --- a/nano/node/scheduler/priority.hpp +++ b/nano/node/scheduler/priority.hpp @@ -9,12 +9,14 @@ #include #include #include +#include namespace nano { class active_elections; class block; class container_info_component; +class election; class ledger; class stats; } @@ -25,6 +27,7 @@ class transaction; namespace nano::scheduler { +class bucket; class priority_config { public: @@ -42,17 +45,14 @@ class priority final priority (priority_config const & config, nano::ledger & ledger, nano::active_elections & active, nano::stats & stats, nano::logger & logger); ~priority (); - void start (); - void stop (); - /** * Activates the first unconfirmed block of \p account_a * @return true if account was activated */ bool activate (secure::transaction const &, nano::account const &); - void notify (); std::size_t size () const; bool empty () const; + void election_stopped (std::shared_ptr election); std::unique_ptr collect_container_info (std::string const & name); @@ -64,15 +64,13 @@ class priority final nano::logger & logger; private: - void run (); bool empty_locked () const; - bool predicate () const; + void activate (scheduler::bucket & bucket, std::shared_ptr block); std::unique_ptr buckets; + // Bucket associated with a particular election + std::unordered_map tracking; - bool stopped{ false }; - nano::condition_variable condition; mutable nano::mutex mutex; - std::thread thread; }; }