diff --git a/nano/core_test/online_reps.cpp b/nano/core_test/online_reps.cpp index 3fde676fbb..3bbaffabd2 100644 --- a/nano/core_test/online_reps.cpp +++ b/nano/core_test/online_reps.cpp @@ -18,11 +18,11 @@ TEST (online_reps, basic) ASSERT_EQ (nano::dev::constants.genesis_amount, node1.online_reps.online ()); // 1 minimum, 1 maximum ASSERT_EQ (node1.config.online_weight_minimum, node1.online_reps.trended ()); - node1.online_reps.sample (); + node1.online_reps.force_sample (); ASSERT_EQ (nano::dev::constants.genesis_amount, node1.online_reps.trended ()); node1.online_reps.clear (); // 2 minimum, 1 maximum - node1.online_reps.sample (); + node1.online_reps.force_sample (); ASSERT_EQ (node1.config.online_weight_minimum, node1.online_reps.trended ()); } diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index b3f5312577..4e7a442ea3 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -80,6 +80,7 @@ enum class type signal_manager, peer_history, message_processor, + online_reps, local_block_broadcaster, monitor, confirming_set, diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 23c7e819d2..78e68d4f38 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -102,6 +102,7 @@ enum class type message_processor_overfill, message_processor_type, process_confirmed, + online_reps, _last // Must be the last enum }; @@ -582,6 +583,15 @@ enum class detail rollback_skipped, loop_scan, + // online_reps + trim_trend, + sanitize_old, + sanitize_future, + sample, + rep_new, + rep_update, + update_online, + _last // Must be the last enum }; diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 503b0281f8..ccb34be3bb 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -184,6 +184,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::vote_router: thread_role_name_string = "Vote router"; break; + case nano::thread_role::name::online_reps: + thread_role_name_string = "Online reps"; + break; case nano::thread_role::name::monitor: thread_role_name_string = "Monitor"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 420aa0d8ea..66f7455d3e 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -66,6 +66,7 @@ enum class name port_mapping, stats, vote_router, + online_reps, monitor, }; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 99af92a004..61eb237310 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -139,7 +139,7 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy rep_crawler (config.rep_crawler, *this), rep_tiers{ ledger, network_params, online_reps, stats, logger }, warmed_up (0), - online_reps_impl{ std::make_unique (config, ledger) }, + online_reps_impl{ std::make_unique (config, ledger, stats, logger) }, online_reps{ *online_reps_impl }, history_impl{ std::make_unique (config.network_params.voting) }, history{ *history_impl }, @@ -610,8 +610,6 @@ void nano::node::start () rep_crawler.start (); } - ongoing_online_weight_calculation_queue (); - bool tcp_enabled = false; if (config.tcp_incoming_connections_max > 0 && !(flags.disable_bootstrap_listener && flags.disable_tcp_realtime)) { @@ -665,6 +663,7 @@ void nano::node::start () local_block_broadcaster.start (); peer_history.start (); vote_router.start (); + online_reps.start (); monitor.start (); add_initial_peers (); @@ -681,7 +680,7 @@ void nano::node::stop () logger.info (nano::log::type::node, "Node stopping..."); tcp_listener.stop (); - + online_reps.stop (); vote_router.stop (); peer_history.stop (); // Cancels ongoing work generation tasks, which may be blocking other threads @@ -1066,28 +1065,11 @@ bool nano::node::block_confirmed_or_being_confirmed (nano::block_hash const & ha return block_confirmed_or_being_confirmed (ledger.tx_begin_read (), hash_a); } -void nano::node::ongoing_online_weight_calculation_queue () -{ - std::weak_ptr node_w (shared_from_this ()); - workers.post_delayed ((std::chrono::seconds (network_params.node.weight_period)), [node_w] () { - if (auto node_l = node_w.lock ()) - { - node_l->ongoing_online_weight_calculation (); - } - }); -} - bool nano::node::online () const { return rep_crawler.total_weight () > online_reps.delta (); } -void nano::node::ongoing_online_weight_calculation () -{ - online_reps.sample (); - ongoing_online_weight_calculation_queue (); -} - std::shared_ptr nano::node::shared () { return shared_from_this (); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index fc0fcb9424..8883087712 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -114,8 +114,6 @@ class node final : public std::enable_shared_from_this bool block_confirmed_or_being_confirmed (nano::block_hash const &); void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr const &, std::shared_ptr const &, std::shared_ptr const &); - void ongoing_online_weight_calculation (); - void ongoing_online_weight_calculation_queue (); bool online () const; bool init_error () const; std::pair> get_bootstrap_weights () const; diff --git a/nano/node/online_reps.cpp b/nano/node/online_reps.cpp index a8d5d58f95..732d6f2e42 100644 --- a/nano/node/online_reps.cpp +++ b/nano/node/online_reps.cpp @@ -1,92 +1,225 @@ #include +#include +#include #include #include #include #include #include -nano::online_reps::online_reps (nano::node_config const & config_a, nano::ledger & ledger_a) : +nano::online_reps::online_reps (nano::node_config const & config_a, nano::ledger & ledger_a, nano::stats & stats_a, nano::logger & logger_a) : config{ config_a }, - ledger{ ledger_a } + ledger{ ledger_a }, + stats{ stats_a }, + logger{ logger_a } { - if (!ledger.store.init_error ()) +} + +nano::online_reps::~online_reps () +{ + debug_assert (!thread.joinable ()); +} + +void nano::online_reps::start () +{ + debug_assert (!thread.joinable ()); + { - auto transaction (ledger.store.tx_begin_read ()); + auto transaction = ledger.tx_begin_write (nano::store::writer::online_weight); + sanitize_trend (transaction); trended_m = calculate_trend (transaction); + logger.debug (nano::log::type::online_reps, "Initial trended weight: {}", fmt::streamed (trended_m)); + } + + thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::online_reps); + run (); + }); +} + +void nano::online_reps::stop () +{ + { + nano::lock_guard lock{ mutex }; + stopped = true; + } + condition.notify_all (); + if (thread.joinable ()) + { + thread.join (); } } -void nano::online_reps::observe (nano::account const & rep_a) +void nano::online_reps::observe (nano::account const & rep) { - if (ledger.weight (rep_a) > 0) + if (ledger.weight (rep) > config.representative_vote_weight_minimum) { nano::lock_guard lock{ mutex }; + auto now = std::chrono::steady_clock::now (); - auto new_insert = reps.get ().erase (rep_a) == 0; - reps.insert ({ now, rep_a }); - auto cutoff = reps.get ().lower_bound (now - std::chrono::seconds (config.network_params.node.weight_period)); - auto trimmed = reps.get ().begin () != cutoff; - reps.get ().erase (reps.get ().begin (), cutoff); + auto new_insert = reps.get ().erase (rep) == 0; + reps.insert ({ now, rep }); + + stats.inc (nano::stat::type::online_reps, new_insert ? nano::stat::detail::rep_new : nano::stat::detail::rep_update); + + bool trimmed = trim (); + + // Update current online weight if anything changed if (new_insert || trimmed) { + stats.inc (nano::stat::type::online_reps, nano::stat::detail::update_online); online_m = calculate_online (); } } } -void nano::online_reps::sample () +bool nano::online_reps::trim () +{ + debug_assert (!mutex.try_lock ()); + + auto now = std::chrono::steady_clock::now (); + auto cutoff = reps.get ().lower_bound (now - config.network_params.node.weight_interval); + auto trimmed = reps.get ().begin () != cutoff; + reps.get ().erase (reps.get ().begin (), cutoff); + return trimmed; +} + +void nano::online_reps::run () { nano::unique_lock lock{ mutex }; - nano::uint128_t online_l = online_m; - lock.unlock (); - nano::uint128_t trend_l; + while (!stopped) { - auto transaction = ledger.store.tx_begin_write (); - // Discard oldest entries - while (ledger.store.online_weight.count (transaction) >= config.network_params.node.max_weight_samples) + auto next = std::chrono::steady_clock::now () + config.network_params.node.weight_interval; + condition.wait_until (lock, next, [this, next] { + return stopped || std::chrono::steady_clock::now () >= next; + }); + if (!stopped) { - auto oldest (ledger.store.online_weight.begin (transaction)); - debug_assert (oldest != ledger.store.online_weight.end (transaction)); - ledger.store.online_weight.del (transaction, oldest->first); + lock.unlock (); + sample (); + lock.lock (); } - ledger.store.online_weight.put (transaction, std::chrono::system_clock::now ().time_since_epoch ().count (), online_l); - trend_l = calculate_trend (transaction); } - lock.lock (); - trended_m = trend_l; +} + +void nano::online_reps::sample () +{ + stats.inc (nano::stat::type::online_reps, nano::stat::detail::sample); + + auto transaction = ledger.tx_begin_write (nano::store::writer::online_weight); + trim_trend (transaction); + ledger.store.online_weight.put (transaction, nano::seconds_since_epoch (), online ()); + auto trended_l = calculate_trend (transaction); + { + nano::lock_guard lock{ mutex }; + trended_m = trended_l; + } + + logger.debug (nano::log::type::online_reps, "Updated trended weight: {}", fmt::streamed (trended_l)); } nano::uint128_t nano::online_reps::calculate_online () const { - nano::uint128_t current; - for (auto & i : reps) + debug_assert (!mutex.try_lock ()); + return std::accumulate (reps.begin (), reps.end (), nano::uint128_t{ 0 }, [this] (nano::uint128_t current, rep_info const & info) { + return current + ledger.weight (info.account); + }); +} + +void nano::online_reps::trim_trend (nano::store::write_transaction const & transaction) +{ + auto const now = std::chrono::system_clock::now (); + auto const cutoff = now - config.network_params.node.weight_cutoff; + + for (auto it = ledger.store.online_weight.begin (transaction); it != ledger.store.online_weight.end (transaction); ++it) { - current += ledger.weight (i.account); + auto tstamp = nano::from_seconds_since_epoch (it->first); + if (tstamp < cutoff) + { + stats.inc (nano::stat::type::online_reps, nano::stat::detail::trim_trend); + ledger.store.online_weight.del (transaction, it->first); + } + else + { + // Entries are ordered by timestamp, so break early + break; + } } - return current; + + // Ensure that all remaining entries are within the expected range + debug_assert (verify_consistency (transaction, now, cutoff)); } -nano::uint128_t nano::online_reps::calculate_trend (store::transaction & transaction_a) const +void nano::online_reps::sanitize_trend (nano::store::write_transaction const & transaction) +{ + auto const now = std::chrono::system_clock::now (); + auto const cutoff = now - config.network_params.node.weight_cutoff; + + size_t removed_old = 0, removed_future = 0; + + for (auto it = ledger.store.online_weight.begin (transaction); it != ledger.store.online_weight.end (transaction); ++it) + { + auto tstamp = nano::from_seconds_since_epoch (it->first); + if (tstamp < cutoff) + { + stats.inc (nano::stat::type::online_reps, nano::stat::detail::sanitize_old); + // TODO: Ensure it's OK to delete entry with the same key as the current iterator + ledger.store.online_weight.del (transaction, it->first); + ++removed_old; + } + else if (tstamp > now) + { + stats.inc (nano::stat::type::online_reps, nano::stat::detail::sanitize_future); + // TODO: Ensure it's OK to delete entry with the same key as the current iterator + ledger.store.online_weight.del (transaction, it->first); + ++removed_future; + } + } + + logger.info (nano::log::type::online_reps, "Sanitized online weight trend, remaining entries: {}, removed: {} (old: {}, future: {})", + ledger.store.online_weight.count (transaction), + removed_old + removed_future, + removed_old, + removed_future); + + // Ensure that all remaining entries are within the expected range + debug_assert (verify_consistency (transaction, now, cutoff)); +} + +bool nano::online_reps::verify_consistency (nano::store::write_transaction const & transaction, std::chrono::system_clock::time_point now, std::chrono::system_clock::time_point cutoff) const +{ + for (auto it = ledger.store.online_weight.begin (transaction); it != ledger.store.online_weight.end (transaction); ++it) + { + auto tstamp = nano::from_seconds_since_epoch (it->first); + if (tstamp < cutoff || tstamp > now) + { + return false; + } + } + return true; +} + +nano::uint128_t nano::online_reps::calculate_trend (nano::store::transaction const & transaction) const { std::vector items; - items.reserve (config.network_params.node.max_weight_samples + 1); - items.push_back (config.online_weight_minimum.number ()); - for (auto i (ledger.store.online_weight.begin (transaction_a)), n (ledger.store.online_weight.end (transaction_a)); i != n; ++i) + for (auto it = ledger.store.online_weight.begin (transaction); it != ledger.store.online_weight.end (transaction); ++it) { - items.push_back (i->second.number ()); + items.push_back (it->second.number ()); } - nano::uint128_t result; - // Pick median value for our target vote weight - auto median_idx = items.size () / 2; - nth_element (items.begin (), items.begin () + median_idx, items.end ()); - result = items[median_idx]; - return result; + if (!items.empty ()) + { + // Pick median value for our target vote weight + auto median_idx = items.size () / 2; + std::nth_element (items.begin (), items.begin () + median_idx, items.end ()); + return items[median_idx]; + } + return 0; } nano::uint128_t nano::online_reps::trended () const { nano::lock_guard lock{ mutex }; - return trended_m; + return std::max (trended_m, config.online_weight_minimum.number ()); } nano::uint128_t nano::online_reps::online () const @@ -100,7 +233,9 @@ nano::uint128_t nano::online_reps::delta () const nano::lock_guard lock{ mutex }; // Using a larger container to ensure maximum precision auto weight = static_cast (std::max ({ online_m, trended_m, config.online_weight_minimum.number () })); - return ((weight * online_weight_quorum) / 100).convert_to (); + auto delta = ((weight * online_weight_quorum) / 100).convert_to (); + release_assert (delta >= config.online_weight_minimum.number () / 100 * online_weight_quorum); + return delta; } std::vector nano::online_reps::list () @@ -125,6 +260,12 @@ void nano::online_reps::force_online_weight (nano::uint128_t const & online_weig online_m = online_weight; } +void nano::online_reps::force_sample () +{ + release_assert (nano::is_dev_run ()); + sample (); +} + nano::container_info nano::online_reps::container_info () const { nano::lock_guard guard{ mutex }; diff --git a/nano/node/online_reps.hpp b/nano/node/online_reps.hpp index 19cc58692d..62d09c74d0 100644 --- a/nano/node/online_reps.hpp +++ b/nano/node/online_reps.hpp @@ -12,6 +12,7 @@ #include #include +#include #include namespace mi = boost::multi_index; @@ -22,12 +23,14 @@ namespace nano class online_reps final { public: - online_reps (nano::node_config const &, nano::ledger &); + online_reps (nano::node_config const &, nano::ledger &, nano::stats &, nano::logger &); + ~online_reps (); + + void start (); + void stop (); /** Add voting account \p rep_account to the set of online representatives */ void observe (nano::account const & rep_account); - /** Called periodically to sample online weight */ - void sample (); /** Returns the trended online stake */ nano::uint128_t trended () const; @@ -41,14 +44,26 @@ class online_reps final nano::container_info container_info () const; public: + // TODO: This should be in the network constants static unsigned constexpr online_weight_quorum = 67; private: // Dependencies nano::node_config const & config; nano::ledger & ledger; + nano::stats & stats; + nano::logger & logger; private: - nano::uint128_t calculate_trend (nano::store::transaction &) const; + void run (); + /** Called periodically to sample online weight */ + void sample (); + bool trim (); + /** Remove old records from the database */ + void trim_trend (nano::store::write_transaction const &); + /** Iterate over all database samples and remove invalid records. This is meant to clean potential leftovers from previous versions. */ + void sanitize_trend (nano::store::write_transaction const &); + bool verify_consistency (nano::store::write_transaction const &, std::chrono::system_clock::time_point now, std::chrono::system_clock::time_point cutoff) const; + nano::uint128_t calculate_trend (nano::store::transaction const &) const; nano::uint128_t calculate_online () const; private: @@ -75,9 +90,13 @@ class online_reps final nano::uint128_t trended_m; nano::uint128_t online_m; + bool stopped{ false }; + nano::condition_variable condition; mutable nano::mutex mutex; + std::thread thread; public: // Only for tests void force_online_weight (nano::uint128_t const & online_weight); + void force_sample (); }; } diff --git a/nano/node/peer_history.cpp b/nano/node/peer_history.cpp index 99b96ed1a8..1474ba1e54 100644 --- a/nano/node/peer_history.cpp +++ b/nano/node/peer_history.cpp @@ -110,6 +110,7 @@ void nano::peer_history::run_one () auto timestamp = nano::from_milliseconds_since_epoch (timestamp_millis); if (timestamp > now || timestamp < cutoff) { + // TODO: Ensure it's OK to delete entry with the same key as the current iterator store.peer.del (transaction, endpoint); stats.inc (nano::stat::type::peer_history, nano::stat::detail::erased); diff --git a/nano/secure/common.cpp b/nano/secure/common.cpp index 5948826225..d5d071bc91 100644 --- a/nano/secure/common.cpp +++ b/nano/secure/common.cpp @@ -255,8 +255,8 @@ nano::node_constants::node_constants (nano::network_constants & network_constant search_pending_interval = network_constants.is_dev_network () ? std::chrono::seconds (1) : std::chrono::seconds (5 * 60); unchecked_cleaning_interval = std::chrono::minutes (30); process_confirmed_interval = network_constants.is_dev_network () ? std::chrono::milliseconds (50) : std::chrono::milliseconds (500); - max_weight_samples = (network_constants.is_live_network () || network_constants.is_test_network ()) ? 4032 : 288; - weight_period = 5 * 60; // 5 minutes + weight_interval = std::chrono::minutes (5); + weight_cutoff = (network_constants.is_live_network () || network_constants.is_test_network ()) ? std::chrono::weeks (2) : std::chrono::days (1); } /* diff --git a/nano/secure/common.hpp b/nano/secure/common.hpp index 31ccfd45b8..60b7ef31ac 100644 --- a/nano/secure/common.hpp +++ b/nano/secure/common.hpp @@ -211,9 +211,10 @@ class node_constants std::chrono::minutes unchecked_cleaning_interval; std::chrono::milliseconds process_confirmed_interval; - /** The maximum amount of samples for a 2 week period on live or 1 day on beta */ - uint64_t max_weight_samples; - uint64_t weight_period; + /** Time between collecting online representative samples */ + std::chrono::seconds weight_interval; + /** The maximum time to keep online weight samples: 2 weeks on live or 1 day on beta */ + std::chrono::seconds weight_cutoff; }; /** Voting related constants whose value depends on the active network */ diff --git a/nano/store/write_queue.hpp b/nano/store/write_queue.hpp index 0c395212c4..9dc0843c29 100644 --- a/nano/store/write_queue.hpp +++ b/nano/store/write_queue.hpp @@ -18,6 +18,7 @@ enum class writer pruning, voting_final, bounded_backlog, + online_weight, testing // Used in tests to emulate a write lock };