diff --git a/nano/core_test/vote_processor.cpp b/nano/core_test/vote_processor.cpp index c909d2e214..9e0214c094 100644 --- a/nano/core_test/vote_processor.cpp +++ b/nano/core_test/vote_processor.cpp @@ -76,18 +76,6 @@ TEST (vote_processor, invalid_signature) ASSERT_TIMELY_EQ (5s, 2, election->votes ().size ()); } -TEST (vote_processor, no_capacity) -{ - nano::test::system system; - nano::node_flags node_flags; - node_flags.vote_processor_capacity = 0; - auto & node (*system.add_node (node_flags)); - nano::keypair key; - auto vote = nano::test::make_vote (key, { nano::dev::genesis }, nano::vote::timestamp_min * 1, 0); - auto channel (std::make_shared (node, node)); - ASSERT_FALSE (node.vote_processor.vote (vote, channel)); -} - TEST (vote_processor, overflow) { nano::test::system system; @@ -111,7 +99,7 @@ TEST (vote_processor, overflow) } ASSERT_GT (not_processed, 0); ASSERT_LT (not_processed, total); - ASSERT_EQ (not_processed, node.stats.count (nano::stat::type::vote, nano::stat::detail::vote_overflow)); + ASSERT_EQ (not_processed, node.stats.count (nano::stat::type::vote_processor, nano::stat::detail::overfill)); // check that it did not timeout ASSERT_LT (std::chrono::system_clock::now () - start_time, 10s); diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 709902d59e..049b4d299b 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -20,6 +20,9 @@ enum class type : uint8_t network, tcp_server, vote, + vote_processor, + vote_processor_tier, + vote_processor_overfill, election, http_callback, ipc, @@ -377,6 +380,11 @@ enum class detail : uint8_t erase_old, erase_confirmed, + // rep tiers + tier_1, + tier_2, + tier_3, + _last // Must be the last enum }; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 9c7b2be9fa..a321549ac5 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -341,7 +341,12 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy }); observers.vote.add ([this] (std::shared_ptr vote, std::shared_ptr const & channel, nano::vote_code code) { + debug_assert (vote != nullptr); debug_assert (code != nano::vote_code::invalid); + if (channel == nullptr) + { + return; // Channel expired when waiting for vote to be processed + } bool active_in_rep_crawler = rep_crawler.process (vote, channel); if (active_in_rep_crawler) { diff --git a/nano/node/rep_tiers.cpp b/nano/node/rep_tiers.cpp index 7b0cffb47d..df65aea884 100644 --- a/nano/node/rep_tiers.cpp +++ b/nano/node/rep_tiers.cpp @@ -5,6 +5,8 @@ #include #include +#include + using namespace std::chrono_literals; nano::rep_tiers::rep_tiers (nano::ledger & ledger_a, nano::network_params & network_params_a, nano::online_reps & online_reps_a, nano::stats & stats_a, nano::logger & logger_a) : @@ -141,4 +143,11 @@ std::unique_ptr nano::rep_tiers::collect_contain composite->add_component (std::make_unique (container_info{ "representatives_2", representatives_2.size (), sizeof (decltype (representatives_2)::value_type) })); composite->add_component (std::make_unique (container_info{ "representatives_3", representatives_3.size (), sizeof (decltype (representatives_3)::value_type) })); return composite; +} + +nano::stat::detail nano::to_stat_detail (nano::rep_tier tier) +{ + auto value = magic_enum::enum_cast (magic_enum::enum_name (tier)); + debug_assert (value); + return value.value_or (nano::stat::detail{}); } \ No newline at end of file diff --git a/nano/node/rep_tiers.hpp b/nano/node/rep_tiers.hpp index ce989d8e0a..e9134a46e9 100644 --- a/nano/node/rep_tiers.hpp +++ b/nano/node/rep_tiers.hpp @@ -26,6 +26,8 @@ enum class rep_tier tier_3, // (> 5%) of online stake }; +nano::stat::detail to_stat_detail (rep_tier); + class rep_tiers final { public: diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index d8c306d54a..01874811c9 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -27,6 +27,37 @@ nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano rep_tiers{ rep_tiers_a }, max_votes{ flags_a.vote_processor_capacity } { + queue.max_size_query = [this] (auto const & origin) { + switch (origin.source) + { + case nano::rep_tier::tier_3: + return 256; + case nano::rep_tier::tier_2: + return 128; + case nano::rep_tier::tier_1: + return 64; + case nano::rep_tier::none: + return 32; + } + debug_assert (false); + return 0; + }; + + queue.priority_query = [] (auto const & origin) { + switch (origin.source) + { + case nano::rep_tier::tier_3: + return 9; + case nano::rep_tier::tier_2: + return 6; + case nano::rep_tier::tier_1: + return 3; + case nano::rep_tier::none: + return 1; + } + debug_assert (false); + return 0; + }; } nano::vote_processor::~vote_processor () @@ -58,104 +89,81 @@ void nano::vote_processor::stop () } } -void nano::vote_processor::run () +bool nano::vote_processor::vote (std::shared_ptr const & vote, std::shared_ptr const & channel) { - nano::timer elapsed; - bool log_this_iteration; + debug_assert (channel != nullptr); + + auto const tier = rep_tiers.tier (vote->account); + + bool added = false; + { + nano::lock_guard guard{ mutex }; + added = queue.push ({ vote, channel }, tier); + } + if (added) + { + stats.inc (nano::stat::type::vote_processor, nano::stat::detail::process); + stats.inc (nano::stat::type::vote_processor_tier, to_stat_detail (tier)); + condition.notify_all (); + } + else + { + stats.inc (nano::stat::type::vote_processor, nano::stat::detail::overfill); + stats.inc (nano::stat::type::vote_processor_overfill, to_stat_detail (tier)); + } + return added; +} + +void nano::vote_processor::run () +{ nano::unique_lock lock{ mutex }; while (!stopped) { - if (!votes.empty ()) + stats.inc (nano::stat::type::vote_processor, nano::stat::detail::loop); + + if (!queue.empty ()) { - decltype (votes) votes_l; - votes_l.swap (votes); - lock.unlock (); - condition.notify_all (); - - log_this_iteration = false; - // TODO: This is a temporary measure to prevent spamming the logs until we can implement a better solution - if (votes_l.size () > 1024 * 4) - { - /* - * Only log the timing information for this iteration if - * there are a sufficient number of items for it to be relevant - */ - log_this_iteration = true; - elapsed.restart (); - } - - for (auto const & [vote, channel] : votes_l) - { - vote_blocking (vote, channel); - } - - total_processed += votes_l.size (); - - if (log_this_iteration && elapsed.stop () > std::chrono::milliseconds (100)) - { - logger.debug (nano::log::type::vote_processor, "Processed {} votes in {} milliseconds (rate of {} votes per second)", - votes_l.size (), - elapsed.value ().count (), - ((votes_l.size () * 1000ULL) / elapsed.value ().count ())); - } + run_batch (lock); + debug_assert (!lock.owns_lock ()); lock.lock (); } - else - { - condition.wait (lock); - } + + condition.wait (lock, [&] { + return stopped || !queue.empty (); + }); } } -bool nano::vote_processor::vote (std::shared_ptr const & vote_a, std::shared_ptr const & channel_a) +void nano::vote_processor::run_batch (nano::unique_lock & lock) { - debug_assert (channel_a != nullptr); + debug_assert (lock.owns_lock ()); + debug_assert (!mutex.try_lock ()); + debug_assert (!queue.empty ()); - nano::unique_lock lock{ mutex }; + nano::timer timer; - auto should_process = [this] (auto tier) { - if (votes.size () < 6.0 / 9.0 * max_votes) - { - return true; - } - // Level 1 (0.1-1%) - if (votes.size () < 7.0 / 9.0 * max_votes) - { - return (tier == nano::rep_tier::tier_1); - } - // Level 2 (1-5%) - if (votes.size () < 8.0 / 9.0 * max_votes) - { - return (tier == nano::rep_tier::tier_2); - } - // Level 3 (> 5%) - if (votes.size () < max_votes) - { - return (tier == nano::rep_tier::tier_3); - } - return false; - }; + size_t const max_batch_size = 1024 * 4; + auto batch = queue.next_batch (max_batch_size); + + lock.unlock (); - if (!stopped) + for (auto const & [entry, origin] : batch) { - auto tier = rep_tiers.tier (vote_a->account); - if (should_process (tier)) - { - votes.emplace_back (vote_a, channel_a); - lock.unlock (); - condition.notify_all (); - // Lock no longer required + auto const & [vote, channel] = entry; + vote_blocking (vote, channel); + } - return true; // Processed - } - else - { - stats.inc (nano::stat::type::vote, nano::stat::detail::vote_overflow); - } + total_processed += batch.size (); + + if (batch.size () == max_batch_size && timer.stop () > 100ms) + { + logger.debug (nano::log::type::vote_processor, "Processed {} votes in {} milliseconds (rate of {} votes per second)", + batch.size (), + timer.value ().count (), + ((batch.size () * 1000ULL) / timer.value ().count ())); } - return false; // Not processed } nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr const & vote, std::shared_ptr const & channel) @@ -190,13 +198,13 @@ nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr std::size_t nano::vote_processor::size () const { nano::lock_guard guard{ mutex }; - return votes.size (); + return queue.size (); } bool nano::vote_processor::empty () const { nano::lock_guard guard{ mutex }; - return votes.empty (); + return queue.empty (); } std::unique_ptr nano::vote_processor::collect_container_info (std::string const & name) const @@ -204,9 +212,9 @@ std::unique_ptr nano::vote_processor::collect_co std::size_t votes_count; { nano::lock_guard guard{ mutex }; - votes_count = votes.size (); + votes_count = queue.size (); } auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "votes", votes_count, sizeof (decltype (votes)::value_type) })); + composite->add_component (std::make_unique (container_info{ "votes", votes_count, sizeof (decltype (queue)::value_type) })); return composite; } diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index 368e16e1fb..61518ef738 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -71,10 +72,13 @@ class vote_processor final private: void run (); + void run_batch (nano::unique_lock &); private: std::size_t const max_votes; - std::deque, std::shared_ptr>> votes; + + using entry_t = std::pair, std::shared_ptr>; + nano::fair_queue queue; private: bool stopped{ false }; diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index 464c170cda..7b299ab6ce 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -1061,6 +1061,7 @@ nano::websocket_server::websocket_server (nano::websocket::config & config_a, na }); observers.vote.add ([this] (std::shared_ptr vote_a, std::shared_ptr const & channel_a, nano::vote_code code_a) { + debug_assert (vote_a != nullptr); if (server->any_subscriber (nano::websocket::topic::vote)) { nano::websocket::message_builder builder;