From 1fd99a60e8696d107c9027c41ce62606bb8823dd Mon Sep 17 00:00:00 2001 From: Colin LeMahieu Date: Tue, 12 Sep 2023 18:35:12 +0100 Subject: [PATCH] Splitting manual scheduler in to its own file. This changes the order in which elections are scheduled. Both manual and prioritized elections are scheduled in parallel. Previously, manually scheduled elections would always be started before general prioritized elections. --- nano/core_test/active_transactions.cpp | 5 +- nano/core_test/node.cpp | 5 +- nano/node/CMakeLists.txt | 2 + nano/node/node.cpp | 3 +- nano/node/scheduler/component.cpp | 6 ++ nano/node/scheduler/component.hpp | 5 +- nano/node/scheduler/manual.cpp | 94 ++++++++++++++++++++++++++ nano/node/scheduler/manual.hpp | 46 +++++++++++++ nano/node/scheduler/priority.cpp | 35 ++-------- nano/node/scheduler/priority.hpp | 7 +- nano/rpc_test/rpc.cpp | 5 +- nano/slow_test/node.cpp | 9 +-- nano/test_common/testutil.cpp | 5 +- 13 files changed, 177 insertions(+), 50 deletions(-) create mode 100644 nano/node/scheduler/manual.cpp create mode 100644 nano/node/scheduler/manual.hpp diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index 9842f8bf28..066fa04eae 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -1393,11 +1394,11 @@ TEST (active_transactions, fifo) ASSERT_EQ (nano::process_result::progress, node.process (*receive2).code); // Ensure first transaction becomes active - node.scheduler.priority.manual (receive1); + node.scheduler.manual.push (receive1); ASSERT_TIMELY (5s, node.active.election (receive1->qualified_root ()) != nullptr); // Ensure second transaction becomes active - node.scheduler.priority.manual (receive2); + node.scheduler.manual.push (receive2); ASSERT_TIMELY (5s, node.active.election (receive2->qualified_root ()) != nullptr); // Ensure excess transactions get trimmed diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 7b8468bf9c..ab4408a53d 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -987,7 +988,7 @@ TEST (node, fork_open_flip) // give block open1 to node1, manually trigger an election for open1 and ensure it is in the ledger node1.process_active (open1); ASSERT_TIMELY (5s, node1.block (open1->hash ()) != nullptr); - node1.scheduler.priority.manual (open1); + node1.scheduler.manual.push (open1); ASSERT_TIMELY (5s, (election = node1.active.election (open1->qualified_root ())) != nullptr); election->transition_active (); @@ -1000,7 +1001,7 @@ TEST (node, fork_open_flip) // ensure open2 is in node2 ledger (and therefore has sideband) and manually trigger an election for open2 ASSERT_TIMELY (5s, node2.block (open2->hash ()) != nullptr); - node2.scheduler.priority.manual (open2); + node2.scheduler.manual.push (open2); ASSERT_TIMELY (5s, (election = node2.active.election (open2->qualified_root ())) != nullptr); election->transition_active (); diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index ccb4d4fe76..4e1f29cc02 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -202,6 +202,8 @@ add_library( scheduler/component.cpp scheduler/hinted.hpp scheduler/hinted.cpp + scheduler/manual.hpp + scheduler/manual.cpp scheduler/optimistic.hpp scheduler/optimistic.cpp scheduler/priority.hpp diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 95e9971901..5918036783 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -1261,7 +1262,7 @@ void nano::node::add_initial_peers () void nano::node::start_election (std::shared_ptr const & block) { - scheduler.priority.manual (block); + scheduler.manual.push (block); } bool nano::node::block_confirmed (nano::block_hash const & hash_a) diff --git a/nano/node/scheduler/component.cpp b/nano/node/scheduler/component.cpp index 33fa7dc1ff..f1462f3893 100644 --- a/nano/node/scheduler/component.cpp +++ b/nano/node/scheduler/component.cpp @@ -1,14 +1,17 @@ #include #include #include +#include #include #include nano::scheduler::component::component (nano::node & node) : hinted_impl{ std::make_unique (nano::scheduler::hinted::config{ node.config }, node, node.inactive_vote_cache, node.active, node.online_reps, node.stats) }, + manual_impl{ std::make_unique (node) }, optimistic_impl{ std::make_unique (node.config.optimistic_scheduler, node, node.ledger, node.active, node.network_params.network, node.stats) }, priority_impl{ std::make_unique (node, node.stats) }, hinted{ *hinted_impl }, + manual{ *manual_impl }, optimistic{ *optimistic_impl }, priority{ *priority_impl } { @@ -21,6 +24,7 @@ nano::scheduler::component::~component () void nano::scheduler::component::start () { hinted.start (); + manual.start (); optimistic.start (); priority.start (); } @@ -28,6 +32,7 @@ void nano::scheduler::component::start () void nano::scheduler::component::stop () { hinted.stop (); + manual.stop (); optimistic.stop (); priority.stop (); } @@ -38,6 +43,7 @@ std::unique_ptr nano::scheduler::component::coll auto composite = std::make_unique (name); //composite->add_component (hinted.collect_container_info ("hinted")); + composite->add_component (manual.collect_container_info ("manual")); //composite->add_component (optimistic.collect_container_info ("optimistic")); composite->add_component (priority.collect_container_info ("priority")); return composite; diff --git a/nano/node/scheduler/component.hpp b/nano/node/scheduler/component.hpp index 7768fba36c..257902438a 100644 --- a/nano/node/scheduler/component.hpp +++ b/nano/node/scheduler/component.hpp @@ -13,12 +13,14 @@ class node; namespace nano::scheduler { class hinted; +class manual; class optimistic; class priority; -class component +class component final { std::unique_ptr hinted_impl; + std::unique_ptr manual_impl; std::unique_ptr optimistic_impl; std::unique_ptr priority_impl; nano::mutex mutex; @@ -35,6 +37,7 @@ class component std::unique_ptr collect_container_info (std::string const & name); nano::scheduler::hinted & hinted; + nano::scheduler::manual & manual; nano::scheduler::optimistic & optimistic; nano::scheduler::priority & priority; }; diff --git a/nano/node/scheduler/manual.cpp b/nano/node/scheduler/manual.cpp new file mode 100644 index 0000000000..82ce43debb --- /dev/null +++ b/nano/node/scheduler/manual.cpp @@ -0,0 +1,94 @@ +#include +#include + +nano::scheduler::manual::manual (nano::node & node) : + node{ node } +{ +} + +nano::scheduler::manual::~manual () +{ + // Thread must be stopped before destruction + debug_assert (!thread.joinable ()); +} + +void nano::scheduler::manual::start () +{ + debug_assert (!thread.joinable ()); + + thread = std::thread{ [this] () { + nano::thread_role::set (nano::thread_role::name::election_scheduler); + run (); + } }; +} + +void nano::scheduler::manual::stop () +{ + { + nano::lock_guard lock{ mutex }; + stopped = true; + } + notify (); + nano::join_or_pass (thread); +} + +void nano::scheduler::manual::notify () +{ + condition.notify_all (); +} + +void nano::scheduler::manual::push (std::shared_ptr const & block_a, boost::optional const & previous_balance_a, nano::election_behavior election_behavior_a) +{ + nano::lock_guard lock{ mutex }; + queue.push_back (std::make_tuple (block_a, previous_balance_a, election_behavior_a)); + notify (); +} + +bool nano::scheduler::manual::predicate () const +{ + return !queue.empty (); +} + +void nano::scheduler::manual::run () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + condition.wait (lock, [this] () { + return stopped || predicate (); + }); + debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds + if (!stopped) + { + node.stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop); + + if (predicate ()) + { + auto const [block, previous_balance, election_behavior] = queue.front (); + queue.pop_front (); + lock.unlock (); + node.stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_manual); + auto result = node.active.insert (block, election_behavior); + if (result.election != nullptr) + { + result.election->transition_active (); + } + } + else + { + lock.unlock (); + } + notify (); + lock.lock (); + } + } +} + +std::unique_ptr nano::scheduler::manual::collect_container_info (std::string const & name) +{ + nano::unique_lock lock{ mutex }; + + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "queue", queue.size (), sizeof (decltype (queue)::value_type) })); + return composite; +} diff --git a/nano/node/scheduler/manual.hpp b/nano/node/scheduler/manual.hpp new file mode 100644 index 0000000000..3edfa1dc75 --- /dev/null +++ b/nano/node/scheduler/manual.hpp @@ -0,0 +1,46 @@ +#pragma once +#include +#include +#include + +#include + +#include +#include +#include + +namespace nano +{ +class block; +class node; +} + +namespace nano::scheduler +{ +class buckets; +class manual final +{ + std::deque, boost::optional, nano::election_behavior>> queue; + nano::node & node; + nano::mutex mutex; + nano::condition_variable condition; + bool stopped{ false }; + std::thread thread; + void notify (); + bool predicate () const; + void run (); + +public: + manual (nano::node & node); + ~manual (); + + void start (); + void stop (); + + // Manualy start an election for a block + // Call action with confirmed block, may be different than what we started with + void push (std::shared_ptr const &, boost::optional const & = boost::none, nano::election_behavior = nano::election_behavior::normal); + + std::unique_ptr collect_container_info (std::string const & name); +}; // class manual +} // nano::scheduler diff --git a/nano/node/scheduler/priority.cpp b/nano/node/scheduler/priority.cpp index c8e9e0f2dc..323a18c515 100644 --- a/nano/node/scheduler/priority.cpp +++ b/nano/node/scheduler/priority.cpp @@ -35,13 +35,6 @@ void nano::scheduler::priority::stop () nano::join_or_pass (thread); } -void nano::scheduler::priority::manual (std::shared_ptr const & block_a, boost::optional const & previous_balance_a, nano::election_behavior election_behavior_a) -{ - nano::lock_guard lock{ mutex }; - manual_queue.push_back (std::make_tuple (block_a, previous_balance_a, election_behavior_a)); - notify (); -} - bool nano::scheduler::priority::activate (nano::account const & account_a, nano::transaction const & transaction) { debug_assert (!account_a.is_zero ()); @@ -79,12 +72,12 @@ void nano::scheduler::priority::notify () std::size_t nano::scheduler::priority::size () const { nano::lock_guard lock{ mutex }; - return buckets->size () + manual_queue.size (); + return buckets->size (); } bool nano::scheduler::priority::empty_locked () const { - return buckets->empty () && manual_queue.empty (); + return buckets->empty (); } bool nano::scheduler::priority::empty () const @@ -93,42 +86,25 @@ bool nano::scheduler::priority::empty () const return empty_locked (); } -bool nano::scheduler::priority::priority_queue_predicate () const +bool nano::scheduler::priority::predicate () const { return node.active.vacancy () > 0 && !buckets->empty (); } -bool nano::scheduler::priority::manual_queue_predicate () const -{ - return !manual_queue.empty (); -} - void nano::scheduler::priority::run () { nano::unique_lock lock{ mutex }; while (!stopped) { condition.wait (lock, [this] () { - return stopped || priority_queue_predicate () || manual_queue_predicate (); + return stopped || predicate (); }); debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds if (!stopped) { stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop); - if (manual_queue_predicate ()) - { - auto const [block, previous_balance, election_behavior] = manual_queue.front (); - manual_queue.pop_front (); - lock.unlock (); - stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_manual); - auto result = node.active.insert (block, election_behavior); - if (result.election != nullptr) - { - result.election->transition_active (); - } - } - else if (priority_queue_predicate ()) + if (predicate ()) { auto block = buckets->top (); buckets->pop (); @@ -159,7 +135,6 @@ std::unique_ptr nano::scheduler::priority::colle nano::unique_lock lock{ mutex }; auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "manual_queue", manual_queue.size (), sizeof (decltype (manual_queue)::value_type) })); composite->add_component (buckets->collect_container_info ("buckets")); return composite; } diff --git a/nano/node/scheduler/priority.hpp b/nano/node/scheduler/priority.hpp index b798f7a400..13b32b950b 100644 --- a/nano/node/scheduler/priority.hpp +++ b/nano/node/scheduler/priority.hpp @@ -32,9 +32,6 @@ class priority final priority (nano::node &, nano::stats &); ~priority (); - // Manualy start an election for a block - // Call action with confirmed block, may be different than what we started with - void manual (std::shared_ptr const &, boost::optional const & = boost::none, nano::election_behavior = nano::election_behavior::normal); /** * Activates the first unconfirmed block of \p account_a * @return true if account was activated @@ -51,12 +48,10 @@ class priority final private: void run (); bool empty_locked () const; - bool priority_queue_predicate () const; - bool manual_queue_predicate () const; + bool predicate () const; std::unique_ptr buckets; - std::deque, boost::optional, nano::election_behavior>> manual_queue; bool stopped{ false }; nano::condition_variable condition; mutable nano::mutex mutex; diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 3c01236e3c..2dd484c8ab 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -1556,7 +1557,7 @@ TEST (rpc, process_subtype_open) ASSERT_EQ (nano::process_result::progress, node1->process (*send).code); ASSERT_EQ (nano::process_result::progress, node2.process (*send).code); auto const rpc_ctx = add_rpc (system, node1); - node1->scheduler.priority.manual (send); + node1->scheduler.manual.push (send); auto open = builder .state () .account (key.pub) @@ -1605,7 +1606,7 @@ TEST (rpc, process_subtype_receive) ASSERT_EQ (nano::process_result::progress, node1->process (*send).code); ASSERT_EQ (nano::process_result::progress, node2.process (*send).code); auto const rpc_ctx = add_rpc (system, node1); - node1->scheduler.priority.manual (send); + node1->scheduler.manual.push (send); auto receive = builder .state () .account (nano::dev::genesis_key.pub) diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index c9dd6c09aa..d63b2ac154 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -677,7 +678,7 @@ TEST (confirmation_height, many_accounts_single_confirmation) { auto block = node->block (last_open_hash); ASSERT_NE (nullptr, block); - node->scheduler.priority.manual (block); + node->scheduler.manual.push (block); std::shared_ptr election; ASSERT_TIMELY (10s, (election = node->active.election (block->qualified_root ())) != nullptr); election->force_confirm (); @@ -760,7 +761,7 @@ TEST (confirmation_height, many_accounts_many_confirmations) // Confirm all of the accounts for (auto & open_block : open_blocks) { - node->scheduler.priority.manual (open_block); + node->scheduler.manual.push (open_block); std::shared_ptr election; ASSERT_TIMELY (10s, (election = node->active.election (open_block->qualified_root ())) != nullptr); election->force_confirm (); @@ -900,7 +901,7 @@ TEST (confirmation_height, long_chains) // Call block confirm on the existing receive block on the genesis account which will confirm everything underneath on both accounts { - node->scheduler.priority.manual (receive1); + node->scheduler.manual.push (receive1); std::shared_ptr election; ASSERT_TIMELY (10s, (election = node->active.election (receive1->qualified_root ())) != nullptr); election->force_confirm (); @@ -2225,7 +2226,7 @@ TEST (node, wallet_create_block_confirm_conflicts) // Call block confirm on the top level send block which will confirm everything underneath on both accounts. { auto block = node->store.block.get (node->store.tx_begin_read (), latest); - node->scheduler.priority.manual (block); + node->scheduler.manual.push (block); std::shared_ptr election; ASSERT_TIMELY (10s, (election = node->active.election (block->qualified_root ())) != nullptr); election->force_confirm (); diff --git a/nano/test_common/testutil.cpp b/nano/test_common/testutil.cpp index f3aba3d3f4..05a6db689b 100644 --- a/nano/test_common/testutil.cpp +++ b/nano/test_common/testutil.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -124,7 +125,7 @@ bool nano::test::activate (nano::node & node, std::vector hash // Block does not exist in the ledger yet return false; } - node.scheduler.priority.manual (disk_block); + node.scheduler.manual.push (disk_block); } return true; } @@ -205,7 +206,7 @@ std::shared_ptr nano::test::start_election (nano::test::system & block_l = node_a.block (hash_a); } - node_a.scheduler.priority.manual (block_l); + node_a.scheduler.manual.push (block_l); // wait for the election to appear std::shared_ptr election = node_a.active.election (block_l->qualified_root ());