Skip to content

Commit

Permalink
Splitting manual scheduler in to its own file.
Browse files Browse the repository at this point in the history
This changes the order in which elections are scheduled. Both manual and prioritized elections are scheduled in parallel. Previously, manually scheduled elections would always be started before general prioritized elections.
  • Loading branch information
clemahieu committed Sep 12, 2023
1 parent 3ae56fb commit 1fd99a6
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 50 deletions.
5 changes: 3 additions & 2 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <nano/lib/jsonconfig.hpp>
#include <nano/node/election.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/manual.hpp>
#include <nano/node/scheduler/priority.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/test_common/chains.hpp>
Expand Down Expand Up @@ -1393,11 +1394,11 @@ TEST (active_transactions, fifo)
ASSERT_EQ (nano::process_result::progress, node.process (*receive2).code);

// Ensure first transaction becomes active
node.scheduler.priority.manual (receive1);
node.scheduler.manual.push (receive1);
ASSERT_TIMELY (5s, node.active.election (receive1->qualified_root ()) != nullptr);

// Ensure second transaction becomes active
node.scheduler.priority.manual (receive2);
node.scheduler.manual.push (receive2);
ASSERT_TIMELY (5s, node.active.election (receive2->qualified_root ()) != nullptr);

// Ensure excess transactions get trimmed
Expand Down
5 changes: 3 additions & 2 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <nano/lib/config.hpp>
#include <nano/node/election.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/manual.hpp>
#include <nano/node/scheduler/priority.hpp>
#include <nano/node/transport/fake.hpp>
#include <nano/node/transport/inproc.hpp>
Expand Down Expand Up @@ -987,7 +988,7 @@ TEST (node, fork_open_flip)
// give block open1 to node1, manually trigger an election for open1 and ensure it is in the ledger
node1.process_active (open1);
ASSERT_TIMELY (5s, node1.block (open1->hash ()) != nullptr);
node1.scheduler.priority.manual (open1);
node1.scheduler.manual.push (open1);
ASSERT_TIMELY (5s, (election = node1.active.election (open1->qualified_root ())) != nullptr);
election->transition_active ();

Expand All @@ -1000,7 +1001,7 @@ TEST (node, fork_open_flip)

// ensure open2 is in node2 ledger (and therefore has sideband) and manually trigger an election for open2
ASSERT_TIMELY (5s, node2.block (open2->hash ()) != nullptr);
node2.scheduler.priority.manual (open2);
node2.scheduler.manual.push (open2);
ASSERT_TIMELY (5s, (election = node2.active.election (open2->qualified_root ())) != nullptr);
election->transition_active ();

Expand Down
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ add_library(
scheduler/component.cpp
scheduler/hinted.hpp
scheduler/hinted.cpp
scheduler/manual.hpp
scheduler/manual.cpp
scheduler/optimistic.hpp
scheduler/optimistic.cpp
scheduler/priority.hpp
Expand Down
3 changes: 2 additions & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <nano/node/rocksdb/rocksdb.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/hinted.hpp>
#include <nano/node/scheduler/manual.hpp>
#include <nano/node/scheduler/optimistic.hpp>
#include <nano/node/scheduler/priority.hpp>
#include <nano/node/telemetry.hpp>
Expand Down Expand Up @@ -1261,7 +1262,7 @@ void nano::node::add_initial_peers ()

void nano::node::start_election (std::shared_ptr<nano::block> const & block)
{
scheduler.priority.manual (block);
scheduler.manual.push (block);
}

bool nano::node::block_confirmed (nano::block_hash const & hash_a)
Expand Down
6 changes: 6 additions & 0 deletions nano/node/scheduler/component.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
#include <nano/node/node.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/hinted.hpp>
#include <nano/node/scheduler/manual.hpp>
#include <nano/node/scheduler/optimistic.hpp>
#include <nano/node/scheduler/priority.hpp>

nano::scheduler::component::component (nano::node & node) :
hinted_impl{ std::make_unique<nano::scheduler::hinted> (nano::scheduler::hinted::config{ node.config }, node, node.inactive_vote_cache, node.active, node.online_reps, node.stats) },
manual_impl{ std::make_unique<nano::scheduler::manual> (node) },
optimistic_impl{ std::make_unique<nano::scheduler::optimistic> (node.config.optimistic_scheduler, node, node.ledger, node.active, node.network_params.network, node.stats) },
priority_impl{ std::make_unique<nano::scheduler::priority> (node, node.stats) },
hinted{ *hinted_impl },
manual{ *manual_impl },
optimistic{ *optimistic_impl },
priority{ *priority_impl }
{
Expand All @@ -21,13 +24,15 @@ nano::scheduler::component::~component ()
void nano::scheduler::component::start ()
{
hinted.start ();
manual.start ();
optimistic.start ();
priority.start ();
}

void nano::scheduler::component::stop ()
{
hinted.stop ();
manual.stop ();
optimistic.stop ();
priority.stop ();
}
Expand All @@ -38,6 +43,7 @@ std::unique_ptr<nano::container_info_component> nano::scheduler::component::coll

auto composite = std::make_unique<container_info_composite> (name);
//composite->add_component (hinted.collect_container_info ("hinted"));
composite->add_component (manual.collect_container_info ("manual"));
//composite->add_component (optimistic.collect_container_info ("optimistic"));
composite->add_component (priority.collect_container_info ("priority"));
return composite;
Expand Down
5 changes: 4 additions & 1 deletion nano/node/scheduler/component.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ class node;
namespace nano::scheduler
{
class hinted;
class manual;
class optimistic;
class priority;

class component
class component final
{
std::unique_ptr<nano::scheduler::hinted> hinted_impl;
std::unique_ptr<nano::scheduler::manual> manual_impl;
std::unique_ptr<nano::scheduler::optimistic> optimistic_impl;
std::unique_ptr<nano::scheduler::priority> priority_impl;
nano::mutex mutex;
Expand All @@ -35,6 +37,7 @@ class component
std::unique_ptr<container_info_component> collect_container_info (std::string const & name);

nano::scheduler::hinted & hinted;
nano::scheduler::manual & manual;
nano::scheduler::optimistic & optimistic;
nano::scheduler::priority & priority;
};
Expand Down
94 changes: 94 additions & 0 deletions nano/node/scheduler/manual.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#include <nano/node/node.hpp>
#include <nano/node/scheduler/manual.hpp>

nano::scheduler::manual::manual (nano::node & node) :
node{ node }
{
}

nano::scheduler::manual::~manual ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
}

void nano::scheduler::manual::start ()
{
debug_assert (!thread.joinable ());

thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::election_scheduler);
run ();
} };
}

void nano::scheduler::manual::stop ()
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
notify ();
nano::join_or_pass (thread);
}

void nano::scheduler::manual::notify ()
{
condition.notify_all ();
}

void nano::scheduler::manual::push (std::shared_ptr<nano::block> const & block_a, boost::optional<nano::uint128_t> const & previous_balance_a, nano::election_behavior election_behavior_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
queue.push_back (std::make_tuple (block_a, previous_balance_a, election_behavior_a));
notify ();
}

bool nano::scheduler::manual::predicate () const
{
return !queue.empty ();
}

void nano::scheduler::manual::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait (lock, [this] () {
return stopped || predicate ();
});
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds
if (!stopped)
{
node.stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop);

if (predicate ())
{
auto const [block, previous_balance, election_behavior] = queue.front ();
queue.pop_front ();
lock.unlock ();
node.stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_manual);
auto result = node.active.insert (block, election_behavior);
if (result.election != nullptr)
{
result.election->transition_active ();
}
}
else
{
lock.unlock ();
}
notify ();
lock.lock ();
}
}
}

std::unique_ptr<nano::container_info_component> nano::scheduler::manual::collect_container_info (std::string const & name)
{
nano::unique_lock<nano::mutex> lock{ mutex };

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "queue", queue.size (), sizeof (decltype (queue)::value_type) }));
return composite;
}
46 changes: 46 additions & 0 deletions nano/node/scheduler/manual.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#pragma once
#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/node/active_transactions.hpp>

#include <boost/optional.hpp>

#include <deque>
#include <memory>
#include <mutex>

namespace nano
{
class block;
class node;
}

namespace nano::scheduler
{
class buckets;
class manual final
{
std::deque<std::tuple<std::shared_ptr<nano::block>, boost::optional<nano::uint128_t>, nano::election_behavior>> queue;
nano::node & node;
nano::mutex mutex;
nano::condition_variable condition;
bool stopped{ false };
std::thread thread;
void notify ();
bool predicate () const;
void run ();

public:
manual (nano::node & node);
~manual ();

void start ();
void stop ();

// Manualy start an election for a block
// Call action with confirmed block, may be different than what we started with
void push (std::shared_ptr<nano::block> const &, boost::optional<nano::uint128_t> const & = boost::none, nano::election_behavior = nano::election_behavior::normal);

std::unique_ptr<container_info_component> collect_container_info (std::string const & name);
}; // class manual
} // nano::scheduler
35 changes: 5 additions & 30 deletions nano/node/scheduler/priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ void nano::scheduler::priority::stop ()
nano::join_or_pass (thread);
}

void nano::scheduler::priority::manual (std::shared_ptr<nano::block> const & block_a, boost::optional<nano::uint128_t> const & previous_balance_a, nano::election_behavior election_behavior_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
manual_queue.push_back (std::make_tuple (block_a, previous_balance_a, election_behavior_a));
notify ();
}

bool nano::scheduler::priority::activate (nano::account const & account_a, nano::transaction const & transaction)
{
debug_assert (!account_a.is_zero ());
Expand Down Expand Up @@ -79,12 +72,12 @@ void nano::scheduler::priority::notify ()
std::size_t nano::scheduler::priority::size () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return buckets->size () + manual_queue.size ();
return buckets->size ();
}

bool nano::scheduler::priority::empty_locked () const
{
return buckets->empty () && manual_queue.empty ();
return buckets->empty ();
}

bool nano::scheduler::priority::empty () const
Expand All @@ -93,42 +86,25 @@ bool nano::scheduler::priority::empty () const
return empty_locked ();
}

bool nano::scheduler::priority::priority_queue_predicate () const
bool nano::scheduler::priority::predicate () const
{
return node.active.vacancy () > 0 && !buckets->empty ();
}

bool nano::scheduler::priority::manual_queue_predicate () const
{
return !manual_queue.empty ();
}

void nano::scheduler::priority::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait (lock, [this] () {
return stopped || priority_queue_predicate () || manual_queue_predicate ();
return stopped || predicate ();
});
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds
if (!stopped)
{
stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop);

if (manual_queue_predicate ())
{
auto const [block, previous_balance, election_behavior] = manual_queue.front ();
manual_queue.pop_front ();
lock.unlock ();
stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_manual);
auto result = node.active.insert (block, election_behavior);
if (result.election != nullptr)
{
result.election->transition_active ();
}
}
else if (priority_queue_predicate ())
if (predicate ())
{
auto block = buckets->top ();
buckets->pop ();
Expand Down Expand Up @@ -159,7 +135,6 @@ std::unique_ptr<nano::container_info_component> nano::scheduler::priority::colle
nano::unique_lock<nano::mutex> lock{ mutex };

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "manual_queue", manual_queue.size (), sizeof (decltype (manual_queue)::value_type) }));
composite->add_component (buckets->collect_container_info ("buckets"));
return composite;
}
7 changes: 1 addition & 6 deletions nano/node/scheduler/priority.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ class priority final
priority (nano::node &, nano::stats &);
~priority ();

// Manualy start an election for a block
// Call action with confirmed block, may be different than what we started with
void manual (std::shared_ptr<nano::block> const &, boost::optional<nano::uint128_t> const & = boost::none, nano::election_behavior = nano::election_behavior::normal);
/**
* Activates the first unconfirmed block of \p account_a
* @return true if account was activated
Expand All @@ -51,12 +48,10 @@ class priority final
private:
void run ();
bool empty_locked () const;
bool priority_queue_predicate () const;
bool manual_queue_predicate () const;
bool predicate () const;

std::unique_ptr<nano::scheduler::buckets> buckets;

std::deque<std::tuple<std::shared_ptr<nano::block>, boost::optional<nano::uint128_t>, nano::election_behavior>> manual_queue;
bool stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex;
Expand Down
Loading

0 comments on commit 1fd99a6

Please sign in to comment.