diff --git a/nano/core_test/vote_processor.cpp b/nano/core_test/vote_processor.cpp index 53fc8213f3..2b286723e0 100644 --- a/nano/core_test/vote_processor.cpp +++ b/nano/core_test/vote_processor.cpp @@ -154,12 +154,15 @@ TEST (vote_processor, weights) // Wait for representatives ASSERT_TIMELY_EQ (10s, node.ledger.cache.rep_weights.get_rep_amounts ().size (), 4); ASSERT_TIMELY_EQ (5s, node.online_reps.online (), total); - node.vote_processor.calculate_weights (); - ASSERT_EQ (node.vote_processor.representative_tier (key0.pub), nano::representative_tier::none); - ASSERT_EQ (node.vote_processor.representative_tier (key1.pub), nano::representative_tier::tier_1); - ASSERT_EQ (node.vote_processor.representative_tier (key2.pub), nano::representative_tier::tier_2); - ASSERT_EQ (node.vote_processor.representative_tier (nano::dev::genesis_key.pub), nano::representative_tier::tier_3); + // Wait for rep tiers to be updated + node.stats.clear (); + ASSERT_TIMELY (5s, node.stats.count (nano::stat::type::rep_tiers, nano::stat::detail::updated) >= 2); + + ASSERT_EQ (node.rep_tiers.tier (key0.pub), nano::rep_tier::none); + ASSERT_EQ (node.rep_tiers.tier (key1.pub), nano::rep_tier::tier_1); + ASSERT_EQ (node.rep_tiers.tier (key2.pub), nano::rep_tier::tier_2); + ASSERT_EQ (node.rep_tiers.tier (nano::dev::genesis_key.pub), nano::rep_tier::tier_3); } // Issue that tracks last changes on this test: https://github.com/nanocurrency/nano-node/issues/3485 diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index d3df7ed774..906239247a 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -70,6 +70,7 @@ enum class type vote_processor, election_scheduler, vote_generator, + rep_tiers, // bootstrap bulk_pull_client, diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 5150d7dec0..31d3f248fe 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -50,6 +50,7 @@ enum class type : uint8_t optimistic_scheduler, handshake, local_block_broadcaster, + rep_tiers, bootstrap_ascending, bootstrap_ascending_accounts, @@ -67,7 +68,10 @@ enum class detail : uint8_t loop, total, process, + processed, + ignored, update, + updated, request, broadcast, cleanup, diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 99caa097fe..1b703b6191 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -103,6 +103,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::local_block_broadcasting: thread_role_name_string = "Local broadcast"; break; + case nano::thread_role::name::rep_tiers: + thread_role_name_string = "Rep tiers"; + break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 56848761ce..d55db1d9d0 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -43,6 +43,7 @@ enum class name scheduler_optimistic, scheduler_priority, local_block_broadcasting, + rep_tiers, }; /* diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index d25585ff44..f8f3b5407e 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -126,6 +126,8 @@ add_library( process_live_dispatcher.hpp repcrawler.hpp repcrawler.cpp + rep_tiers.hpp + rep_tiers.cpp request_aggregator.hpp request_aggregator.cpp scheduler/bucket.cpp diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 09d119f262..3179e6c298 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -177,7 +177,8 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons application_path (application_path_a), port_mapping (*this), rep_crawler (*this), - vote_processor (active, observers, stats, config, flags, logger, online_reps, rep_crawler, ledger, network_params), + rep_tiers{ ledger, network_params, online_reps, stats, logger }, + vote_processor{ active, observers, stats, config, flags, logger, online_reps, rep_crawler, ledger, network_params, rep_tiers }, warmed_up (0), block_processor (*this, write_database_queue), online_reps (ledger, config), @@ -552,6 +553,7 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (node.ascendboot.collect_container_info ("bootstrap_ascending")); composite->add_component (node.unchecked.collect_container_info ("unchecked")); composite->add_component (node.local_block_broadcaster.collect_container_info ("local_block_broadcaster")); + composite->add_component (node.rep_tiers.collect_container_info ("rep_tiers")); return composite; } @@ -601,7 +603,6 @@ void nano::node::start () { rep_crawler.start (); } - ongoing_rep_calculation (); ongoing_peer_store (); ongoing_online_weight_calculation_queue (); @@ -647,6 +648,7 @@ void nano::node::start () port_mapping.start (); } wallets.start (); + rep_tiers.start (); vote_processor.start (); active.start (); generator.start (); @@ -685,6 +687,7 @@ void nano::node::stop () block_processor.stop (); aggregator.stop (); vote_processor.stop (); + rep_tiers.stop (); scheduler.stop (); active.stop (); generator.stop (); @@ -772,19 +775,6 @@ void nano::node::long_inactivity_cleanup () } } -void nano::node::ongoing_rep_calculation () -{ - auto now (std::chrono::steady_clock::now ()); - vote_processor.calculate_weights (); - std::weak_ptr node_w (shared_from_this ()); - workers.add_timed_task (now + std::chrono::minutes (10), [node_w] () { - if (auto node_l = node_w.lock ()) - { - node_l->ongoing_rep_calculation (); - } - }); -} - void nano::node::ongoing_bootstrap () { auto next_wakeup = network_params.network.bootstrap_interval; diff --git a/nano/node/node.hpp b/nano/node/node.hpp index d4105e61e4..db9cfe26e7 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -94,7 +95,6 @@ class node final : public std::enable_shared_from_this std::pair balance_pending (nano::account const &, bool only_confirmed); nano::uint128_t weight (nano::account const &); nano::uint128_t minimum_principal_weight (); - void ongoing_rep_calculation (); void ongoing_bootstrap (); void ongoing_peer_store (); void backup_wallet (); @@ -164,6 +164,7 @@ class node final : public std::enable_shared_from_this nano::port_mapping port_mapping; nano::online_reps online_reps; nano::rep_crawler rep_crawler; + nano::rep_tiers rep_tiers; nano::vote_processor vote_processor; unsigned warmed_up; nano::block_processor block_processor; diff --git a/nano/node/online_reps.hpp b/nano/node/online_reps.hpp index 8b07e9a5d2..f01fec91e0 100644 --- a/nano/node/online_reps.hpp +++ b/nano/node/online_reps.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -15,6 +16,10 @@ namespace nano { class ledger; class node_config; +namespace store +{ + class transaction; +} /** Track online representatives and trend online weight */ class online_reps final diff --git a/nano/node/rep_tiers.cpp b/nano/node/rep_tiers.cpp new file mode 100644 index 0000000000..d432b96200 --- /dev/null +++ b/nano/node/rep_tiers.cpp @@ -0,0 +1,144 @@ +#include +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; + +nano::rep_tiers::rep_tiers (nano::ledger & ledger_a, nano::network_params & network_params_a, nano::online_reps & online_reps_a, nano::stats & stats_a, nano::logger & logger_a) : + ledger{ ledger_a }, + network_params{ network_params_a }, + online_reps{ online_reps_a }, + stats{ stats_a }, + logger{ logger_a } +{ +} + +nano::rep_tiers::~rep_tiers () +{ + // Thread must be stopped before destruction + debug_assert (!thread.joinable ()); +} + +void nano::rep_tiers::start () +{ + debug_assert (!thread.joinable ()); + + thread = std::thread{ [this] () { + nano::thread_role::set (nano::thread_role::name::rep_tiers); + run (); + } }; +} + +void nano::rep_tiers::stop () +{ + { + nano::lock_guard lock{ mutex }; + stopped = true; + } + condition.notify_all (); + if (thread.joinable ()) + { + thread.join (); + } +} + +nano::rep_tier nano::rep_tiers::tier (const nano::account & representative) const +{ + nano::lock_guard lock{ mutex }; + if (representatives_3.find (representative) != representatives_3.end ()) + { + return nano::rep_tier::tier_3; + } + if (representatives_2.find (representative) != representatives_2.end ()) + { + return nano::rep_tier::tier_2; + } + if (representatives_1.find (representative) != representatives_1.end ()) + { + return nano::rep_tier::tier_1; + } + return nano::rep_tier::none; +} + +void nano::rep_tiers::run () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + stats.inc (nano::stat::type::rep_tiers, nano::stat::detail::loop); + + lock.unlock (); + + calculate_tiers (); + + lock.lock (); + + std::chrono::milliseconds interval = network_params.network.is_dev_network () ? 500ms : 10min; + condition.wait_for (lock, interval); + } +} + +void nano::rep_tiers::calculate_tiers () +{ + auto online = online_reps.online (); + auto rep_amounts = ledger.cache.rep_weights.get_rep_amounts (); + + decltype (representatives_1) representatives_1_l; + decltype (representatives_2) representatives_2_l; + decltype (representatives_3) representatives_3_l; + + int ignored = 0; + for (auto const & rep_amount : rep_amounts) + { + nano::account const & representative = rep_amount.first; + + // Using ledger weight here because it takes preconfigured bootstrap weights into account + auto weight = ledger.weight (representative); + if (weight > online / 1000) // 0.1% or above (level 1) + { + representatives_1_l.insert (representative); + if (weight > online / 100) // 1% or above (level 2) + { + representatives_2_l.insert (representative); + if (weight > online / 20) // 5% or above (level 3) + { + representatives_3_l.insert (representative); + } + } + } + else + { + ++ignored; + } + } + + stats.add (nano::stat::type::rep_tiers, nano::stat::detail::processed, nano::stat::dir::in, rep_amounts.size ()); + stats.add (nano::stat::type::rep_tiers, nano::stat::detail::ignored, nano::stat::dir::in, ignored); + logger.debug (nano::log::type::rep_tiers, "Representative tiers updated, tier 1: {}, tier 2: {}, tier 3: {} ({} ignored)", + representatives_1_l.size (), + representatives_2_l.size (), + representatives_3_l.size (), + ignored); + + { + nano::lock_guard guard{ mutex }; + representatives_1 = std::move (representatives_1_l); + representatives_2 = std::move (representatives_2_l); + representatives_3 = std::move (representatives_3_l); + } + + stats.inc (nano::stat::type::rep_tiers, nano::stat::detail::updated); +} + +std::unique_ptr nano::rep_tiers::collect_container_info (const std::string & name) +{ + nano::lock_guard lock{ mutex }; + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "representatives_1", representatives_1.size (), sizeof (decltype (representatives_1)::value_type) })); + composite->add_component (std::make_unique (container_info{ "representatives_2", representatives_2.size (), sizeof (decltype (representatives_2)::value_type) })); + composite->add_component (std::make_unique (container_info{ "representatives_3", representatives_3.size (), sizeof (decltype (representatives_3)::value_type) })); + return composite; +} \ No newline at end of file diff --git a/nano/node/rep_tiers.hpp b/nano/node/rep_tiers.hpp new file mode 100644 index 0000000000..ce989d8e0a --- /dev/null +++ b/nano/node/rep_tiers.hpp @@ -0,0 +1,65 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include + +namespace nano +{ +class ledger; +class network_params; +class stats; +class logger; +class container_info_component; +class online_reps; + +// Higher number means higher priority +enum class rep_tier +{ + none, // Not a principal representatives + tier_1, // (0.1-1%) of online stake + tier_2, // (1-5%) of online stake + tier_3, // (> 5%) of online stake +}; + +class rep_tiers final +{ +public: + rep_tiers (nano::ledger &, nano::network_params &, nano::online_reps &, nano::stats &, nano::logger &); + ~rep_tiers (); + + void start (); + void stop (); + + /** Returns the representative tier for the account */ + nano::rep_tier tier (nano::account const & representative) const; + + std::unique_ptr collect_container_info (std::string const & name); + +private: // Dependencies + nano::ledger & ledger; + nano::network_params & network_params; + nano::online_reps & online_reps; + nano::stats & stats; + nano::logger & logger; + +private: + void run (); + void calculate_tiers (); + +private: + /** Representatives levels for early prioritization */ + std::unordered_set representatives_1; + std::unordered_set representatives_2; + std::unordered_set representatives_3; + + std::atomic stopped{ false }; + nano::condition_variable condition; + mutable nano::mutex mutex; + std::thread thread; +}; +} \ No newline at end of file diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index 5196ca7d49..337b0bddfb 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -15,17 +16,18 @@ using namespace std::chrono_literals; -nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a) : - active (active_a), - observers (observers_a), - stats (stats_a), - config (config_a), - logger (logger_a), - online_reps (online_reps_a), - rep_crawler (rep_crawler_a), - ledger (ledger_a), - network_params (network_params_a), - max_votes (flags_a.vote_processor_capacity) +nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a, nano::rep_tiers & rep_tiers_a) : + active{ active_a }, + observers{ observers_a }, + stats{ stats_a }, + config{ config_a }, + logger{ logger_a }, + online_reps{ online_reps_a }, + rep_crawler{ rep_crawler_a }, + ledger{ ledger_a }, + network_params{ network_params_a }, + rep_tiers{ rep_tiers_a }, + max_votes{ flags_a.vote_processor_capacity } { } @@ -104,31 +106,6 @@ void nano::vote_processor::run () } } -nano::representative_tier nano::vote_processor::representative_tier (const nano::account & representative) const -{ - nano::lock_guard guard{ mutex }; - return representative_tier_locked (representative); -} - -nano::representative_tier nano::vote_processor::representative_tier_locked (const nano::account & representative) const -{ - debug_assert (!mutex.try_lock ()); - - if (representatives_3.find (representative) != representatives_3.end ()) - { - return nano::representative_tier::tier_3; - } - if (representatives_2.find (representative) != representatives_2.end ()) - { - return nano::representative_tier::tier_2; - } - if (representatives_1.find (representative) != representatives_1.end ()) - { - return nano::representative_tier::tier_1; - } - return nano::representative_tier::none; -} - bool nano::vote_processor::vote (std::shared_ptr const & vote_a, std::shared_ptr const & channel_a) { debug_assert (channel_a != nullptr); @@ -136,7 +113,7 @@ bool nano::vote_processor::vote (std::shared_ptr const & vote_a, std nano::unique_lock lock{ mutex }; if (!stopped) { - auto tier = representative_tier_locked (vote_a->account); + auto tier = rep_tiers.tier (vote_a->account); // Level 0 (< 0.1%) if (votes.size () < 6.0 / 9.0 * max_votes) @@ -146,17 +123,17 @@ bool nano::vote_processor::vote (std::shared_ptr const & vote_a, std // Level 1 (0.1-1%) else if (votes.size () < 7.0 / 9.0 * max_votes) { - process = (tier == nano::representative_tier::tier_1); + process = (tier == nano::rep_tier::tier_1); } // Level 2 (1-5%) else if (votes.size () < 8.0 / 9.0 * max_votes) { - process = (tier == nano::representative_tier::tier_2); + process = (tier == nano::rep_tier::tier_2); } // Level 3 (> 5%) else if (votes.size () < max_votes) { - process = (tier == nano::representative_tier::tier_3); + process = (tier == nano::rep_tier::tier_3); } if (process) { @@ -246,62 +223,14 @@ bool nano::vote_processor::empty () const return votes.empty (); } -void nano::vote_processor::calculate_weights () -{ - nano::unique_lock lock{ mutex }; - - if (stopped) - { - return; - } - - representatives_1.clear (); - representatives_2.clear (); - representatives_3.clear (); - - auto online = online_reps.online (); - auto rep_amounts = ledger.cache.rep_weights.get_rep_amounts (); - - for (auto const & rep_amount : rep_amounts) - { - nano::account const & representative = rep_amount.first; - - // Using ledger weight here because it takes preconfigured bootstrap weights into account - auto weight = ledger.weight (representative); - if (weight > online / 1000) // 0.1% or above (level 1) - { - representatives_1.insert (representative); - if (weight > online / 100) // 1% or above (level 2) - { - representatives_2.insert (representative); - if (weight > online / 20) // 5% or above (level 3) - { - representatives_3.insert (representative); - } - } - } - } -} - std::unique_ptr nano::collect_container_info (vote_processor & vote_processor, std::string const & name) { std::size_t votes_count; - std::size_t representatives_1_count; - std::size_t representatives_2_count; - std::size_t representatives_3_count; - { nano::lock_guard guard{ vote_processor.mutex }; votes_count = vote_processor.votes.size (); - representatives_1_count = vote_processor.representatives_1.size (); - representatives_2_count = vote_processor.representatives_2.size (); - representatives_3_count = vote_processor.representatives_3.size (); } - auto composite = std::make_unique (name); composite->add_component (std::make_unique (container_info{ "votes", votes_count, sizeof (decltype (vote_processor.votes)::value_type) })); - composite->add_component (std::make_unique (container_info{ "representatives_1", representatives_1_count, sizeof (decltype (vote_processor.representatives_1)::value_type) })); - composite->add_component (std::make_unique (container_info{ "representatives_2", representatives_2_count, sizeof (decltype (vote_processor.representatives_2)::value_type) })); - composite->add_component (std::make_unique (container_info{ "representatives_3", representatives_3_count, sizeof (decltype (vote_processor.representatives_3)::value_type) })); return composite; } diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index bb2d11a434..b7498180fc 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -26,25 +26,17 @@ class ledger; class network_params; class node_flags; class stats; +class rep_tiers; namespace transport { class channel; } -// Higher number means higher priority -enum class representative_tier -{ - none, // Not a principal representative - tier_1, // (0.1-1%) of online stake - tier_2, // (1-5%) of online stake - tier_3, // (> 5%) of online stake -}; - class vote_processor final { public: - vote_processor (nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger &, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a); + vote_processor (nano::active_transactions &, nano::node_observers &, nano::stats &, nano::node_config &, nano::node_flags &, nano::logger &, nano::online_reps &, nano::rep_crawler &, nano::ledger &, nano::network_params &, nano::rep_tiers &); ~vote_processor (); void start (); @@ -60,9 +52,6 @@ class vote_processor final void flush (); std::size_t size () const; bool empty () const; - void calculate_weights (); - - nano::representative_tier representative_tier (nano::account const & representative) const; std::atomic total_processed{ 0 }; @@ -76,21 +65,16 @@ class vote_processor final nano::rep_crawler & rep_crawler; nano::ledger & ledger; nano::network_params & network_params; + nano::rep_tiers & rep_tiers; private: void run (); void verify_votes (std::deque, std::shared_ptr>> const &); - nano::representative_tier representative_tier_locked (nano::account const & representative) const; private: std::size_t const max_votes; std::deque, std::shared_ptr>> votes; - /** Representatives levels for early prioritization */ - std::unordered_set representatives_1; - std::unordered_set representatives_2; - std::unordered_set representatives_3; - private: bool stopped{ false }; nano::condition_variable condition;