Skip to content

Commit

Permalink
Fair queue for vote processor
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Apr 4, 2024
1 parent 504effc commit 91e3c8e
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 97 deletions.
14 changes: 1 addition & 13 deletions nano/core_test/vote_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,6 @@ TEST (vote_processor, invalid_signature)
ASSERT_TIMELY_EQ (5s, 2, election->votes ().size ());
}

TEST (vote_processor, no_capacity)
{
nano::test::system system;
nano::node_flags node_flags;
node_flags.vote_processor_capacity = 0;
auto & node (*system.add_node (node_flags));
nano::keypair key;
auto vote = nano::test::make_vote (key, { nano::dev::genesis }, nano::vote::timestamp_min * 1, 0);
auto channel (std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_FALSE (node.vote_processor.vote (vote, channel));
}

TEST (vote_processor, overflow)
{
nano::test::system system;
Expand All @@ -111,7 +99,7 @@ TEST (vote_processor, overflow)
}
ASSERT_GT (not_processed, 0);
ASSERT_LT (not_processed, total);
ASSERT_EQ (not_processed, node.stats.count (nano::stat::type::vote, nano::stat::detail::vote_overflow));
ASSERT_EQ (not_processed, node.stats.count (nano::stat::type::vote_processor, nano::stat::detail::overfill));

// check that it did not timeout
ASSERT_LT (std::chrono::system_clock::now () - start_time, 10s);
Expand Down
8 changes: 8 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ enum class type : uint8_t
network,
tcp_server,
vote,
vote_processor,
vote_processor_tier,
vote_processor_overfill,
election,
http_callback,
ipc,
Expand Down Expand Up @@ -377,6 +380,11 @@ enum class detail : uint8_t
erase_old,
erase_confirmed,

// rep tiers
tier_1,
tier_2,
tier_3,

_last // Must be the last enum
};

Expand Down
5 changes: 5 additions & 0 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,12 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
});

observers.vote.add ([this] (std::shared_ptr<nano::vote> vote, std::shared_ptr<nano::transport::channel> const & channel, nano::vote_code code) {
debug_assert (vote != nullptr);
debug_assert (code != nano::vote_code::invalid);
if (channel == nullptr)
{
return; // Channel expired when waiting for vote to be processed
}
bool active_in_rep_crawler = rep_crawler.process (vote, channel);
if (active_in_rep_crawler)
{
Expand Down
9 changes: 9 additions & 0 deletions nano/node/rep_tiers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <nano/secure/common.hpp>
#include <nano/secure/ledger.hpp>

#include <magic_enum.hpp>

using namespace std::chrono_literals;

nano::rep_tiers::rep_tiers (nano::ledger & ledger_a, nano::network_params & network_params_a, nano::online_reps & online_reps_a, nano::stats & stats_a, nano::logger & logger_a) :
Expand Down Expand Up @@ -141,4 +143,11 @@ std::unique_ptr<nano::container_info_component> nano::rep_tiers::collect_contain
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "representatives_2", representatives_2.size (), sizeof (decltype (representatives_2)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "representatives_3", representatives_3.size (), sizeof (decltype (representatives_3)::value_type) }));
return composite;
}

nano::stat::detail nano::to_stat_detail (nano::rep_tier tier)
{
auto value = magic_enum::enum_cast<nano::stat::detail> (magic_enum::enum_name (tier));
debug_assert (value);
return value.value_or (nano::stat::detail{});
}
2 changes: 2 additions & 0 deletions nano/node/rep_tiers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ enum class rep_tier
tier_3, // (> 5%) of online stake
};

nano::stat::detail to_stat_detail (rep_tier);

class rep_tiers final
{
public:
Expand Down
174 changes: 91 additions & 83 deletions nano/node/vote_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,37 @@ nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano
rep_tiers{ rep_tiers_a },
max_votes{ flags_a.vote_processor_capacity }
{
queue.max_size_query = [this] (auto const & origin) {
switch (origin.source)
{
case nano::rep_tier::tier_3:
return 256;
case nano::rep_tier::tier_2:
return 128;
case nano::rep_tier::tier_1:
return 64;
case nano::rep_tier::none:
return 32;
}
debug_assert (false);
return 0;
};

queue.priority_query = [] (auto const & origin) {
switch (origin.source)
{
case nano::rep_tier::tier_3:
return 9;
case nano::rep_tier::tier_2:
return 6;
case nano::rep_tier::tier_1:
return 3;
case nano::rep_tier::none:
return 1;
}
debug_assert (false);
return 0;
};
}

nano::vote_processor::~vote_processor ()
Expand Down Expand Up @@ -58,104 +89,81 @@ void nano::vote_processor::stop ()
}
}

void nano::vote_processor::run ()
bool nano::vote_processor::vote (std::shared_ptr<nano::vote> const & vote, std::shared_ptr<nano::transport::channel> const & channel)
{
nano::timer<std::chrono::milliseconds> elapsed;
bool log_this_iteration;
debug_assert (channel != nullptr);

auto const tier = rep_tiers.tier (vote->account);

bool added = false;
{
nano::lock_guard<nano::mutex> guard{ mutex };
added = queue.push ({ vote, channel }, tier);
}
if (added)
{
stats.inc (nano::stat::type::vote_processor, nano::stat::detail::process);
stats.inc (nano::stat::type::vote_processor_tier, to_stat_detail (tier));

condition.notify_all ();
}
else
{
stats.inc (nano::stat::type::vote_processor, nano::stat::detail::overfill);
stats.inc (nano::stat::type::vote_processor_overfill, to_stat_detail (tier));
}
return added;
}

void nano::vote_processor::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
if (!votes.empty ())
stats.inc (nano::stat::type::vote_processor, nano::stat::detail::loop);

if (!queue.empty ())
{
decltype (votes) votes_l;
votes_l.swap (votes);
lock.unlock ();
condition.notify_all ();

log_this_iteration = false;
// TODO: This is a temporary measure to prevent spamming the logs until we can implement a better solution
if (votes_l.size () > 1024 * 4)
{
/*
* Only log the timing information for this iteration if
* there are a sufficient number of items for it to be relevant
*/
log_this_iteration = true;
elapsed.restart ();
}

for (auto const & [vote, channel] : votes_l)
{
vote_blocking (vote, channel);
}

total_processed += votes_l.size ();

if (log_this_iteration && elapsed.stop () > std::chrono::milliseconds (100))
{
logger.debug (nano::log::type::vote_processor, "Processed {} votes in {} milliseconds (rate of {} votes per second)",
votes_l.size (),
elapsed.value ().count (),
((votes_l.size () * 1000ULL) / elapsed.value ().count ()));
}
run_batch (lock);
debug_assert (!lock.owns_lock ());

lock.lock ();
}
else
{
condition.wait (lock);
}

condition.wait (lock, [&] {
return stopped || !queue.empty ();
});
}
}

bool nano::vote_processor::vote (std::shared_ptr<nano::vote> const & vote_a, std::shared_ptr<nano::transport::channel> const & channel_a)
void nano::vote_processor::run_batch (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (channel_a != nullptr);
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ());

nano::unique_lock<nano::mutex> lock{ mutex };
nano::timer<std::chrono::milliseconds> timer;

auto should_process = [this] (auto tier) {
if (votes.size () < 6.0 / 9.0 * max_votes)
{
return true;
}
// Level 1 (0.1-1%)
if (votes.size () < 7.0 / 9.0 * max_votes)
{
return (tier == nano::rep_tier::tier_1);
}
// Level 2 (1-5%)
if (votes.size () < 8.0 / 9.0 * max_votes)
{
return (tier == nano::rep_tier::tier_2);
}
// Level 3 (> 5%)
if (votes.size () < max_votes)
{
return (tier == nano::rep_tier::tier_3);
}
return false;
};
size_t const max_batch_size = 1024 * 4;
auto batch = queue.next_batch (max_batch_size);

lock.unlock ();

if (!stopped)
for (auto const & [entry, origin] : batch)
{
auto tier = rep_tiers.tier (vote_a->account);
if (should_process (tier))
{
votes.emplace_back (vote_a, channel_a);
lock.unlock ();
condition.notify_all ();
// Lock no longer required
auto const & [vote, channel] = entry;
vote_blocking (vote, channel);
}

return true; // Processed
}
else
{
stats.inc (nano::stat::type::vote, nano::stat::detail::vote_overflow);
}
total_processed += batch.size ();

if (batch.size () == max_batch_size && timer.stop () > 100ms)
{
logger.debug (nano::log::type::vote_processor, "Processed {} votes in {} milliseconds (rate of {} votes per second)",
batch.size (),
timer.value ().count (),
((batch.size () * 1000ULL) / timer.value ().count ()));
}
return false; // Not processed
}

nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr<nano::vote> const & vote, std::shared_ptr<nano::transport::channel> const & channel)
Expand Down Expand Up @@ -190,23 +198,23 @@ nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr<nano::vote>
std::size_t nano::vote_processor::size () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return votes.size ();
return queue.size ();
}

bool nano::vote_processor::empty () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return votes.empty ();
return queue.empty ();
}

std::unique_ptr<nano::container_info_component> nano::vote_processor::collect_container_info (std::string const & name) const
{
std::size_t votes_count;
{
nano::lock_guard<nano::mutex> guard{ mutex };
votes_count = votes.size ();
votes_count = queue.size ();
}
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "votes", votes_count, sizeof (decltype (votes)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "votes", votes_count, sizeof (decltype (queue)::value_type) }));
return composite;
}
6 changes: 5 additions & 1 deletion nano/node/vote_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <nano/lib/numbers.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/fair_queue.hpp>
#include <nano/node/rep_tiers.hpp>
#include <nano/secure/common.hpp>

#include <deque>
Expand Down Expand Up @@ -71,10 +72,13 @@ class vote_processor final

private:
void run ();
void run_batch (nano::unique_lock<nano::mutex> &);

private:
std::size_t const max_votes;
std::deque<std::pair<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>>> votes;

using entry_t = std::pair<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>>;
nano::fair_queue<entry_t, nano::rep_tier> queue;

private:
bool stopped{ false };
Expand Down
1 change: 1 addition & 0 deletions nano/node/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,7 @@ nano::websocket_server::websocket_server (nano::websocket::config & config_a, na
});

observers.vote.add ([this] (std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> const & channel_a, nano::vote_code code_a) {
debug_assert (vote_a != nullptr);
if (server->any_subscriber (nano::websocket::topic::vote))
{
nano::websocket::message_builder builder;
Expand Down

0 comments on commit 91e3c8e

Please sign in to comment.