diff --git a/nano/core_test/backlog.cpp b/nano/core_test/backlog.cpp index ab6bd0fdff..7e1cf5da1c 100644 --- a/nano/core_test/backlog.cpp +++ b/nano/core_test/backlog.cpp @@ -22,9 +22,12 @@ TEST (backlog, population) nano::test::system system{}; auto & node = *system.add_node (); - node.backlog_scan.activated.add ([&] (nano::secure::transaction const & transaction, auto const & info) { + node.backlog_scan.batch_activated.add ([&] (auto const & batch) { nano::lock_guard lock{ mutex }; - activated.insert (info.account); + for (auto const & info : batch) + { + activated.insert (info.account); + } }); auto blocks = nano::test::setup_independent_blocks (system, node, 256); diff --git a/nano/node/backlog_scan.cpp b/nano/node/backlog_scan.cpp index 89893dda2c..9d922c042d 100644 --- a/nano/node/backlog_scan.cpp +++ b/nano/node/backlog_scan.cpp @@ -8,10 +8,11 @@ #include #include -nano::backlog_scan::backlog_scan (backlog_scan_config const & config_a, nano::ledger & ledger, nano::stats & stats_a) : +nano::backlog_scan::backlog_scan (backlog_scan_config const & config_a, nano::ledger & ledger_a, nano::stats & stats_a) : config{ config_a }, - ledger{ ledger }, - stats{ stats_a } + ledger{ ledger_a }, + stats{ stats_a }, + limiter{ config.batch_size * config.frequency } { } @@ -69,76 +70,85 @@ void nano::backlog_scan::run () { stats.inc (nano::stat::type::backlog_scan, nano::stat::detail::loop); triggered = false; - populate_backlog (lock); + populate_backlog (lock); // Does a single iteration over all accounts + debug_assert (lock.owns_lock ()); + } + else + { + condition.wait (lock, [this] () { + return stopped || predicate (); + }); } - - condition.wait (lock, [this] () { - return stopped || predicate (); - }); } } void nano::backlog_scan::populate_backlog (nano::unique_lock & lock) { - debug_assert (config.frequency > 0); + uint64_t total = 0; - const auto chunk_size = config.batch_size / config.frequency; - auto done = false; nano::account next = 0; - uint64_t total = 0; + bool done = false; while (!stopped && !done) { + // Wait for the rate limiter + while (!limiter.should_pass (config.batch_size)) + { + condition.wait_for (lock, std::chrono::milliseconds{ 1000 / config.frequency / 2 }); + if (stopped) + { + return; + } + } + lock.unlock (); + std::deque scanned; + std::deque activated; { auto transaction = ledger.tx_begin_read (); auto it = ledger.store.account.begin (transaction, next); auto const end = ledger.store.account.end (transaction); - auto should_refresh = [&transaction] () { - auto cutoff = std::chrono::steady_clock::now () - 100ms; // TODO: Make this configurable - return transaction.timestamp () < cutoff; - }; - - for (size_t count = 0; it != end && count < chunk_size && !should_refresh (); ++it, ++count, ++total) + for (size_t count = 0; it != end && count < config.batch_size; ++it, ++count, ++total) { stats.inc (nano::stat::type::backlog_scan, nano::stat::detail::total); - auto const & account = it->first; - auto const & account_info = it->second; + auto const [account, account_info] = *it; + auto const maybe_conf_info = ledger.store.confirmation_height.get (transaction, account); + auto const conf_info = maybe_conf_info.value_or (nano::confirmation_height_info{}); + + activated_info info{ account, account_info, conf_info }; - activate (transaction, account, account_info); + scanned.push_back (info); + if (conf_info.height < account_info.block_count) + { + activated.push_back (info); + } - next = account.number () + 1; + next = account.number () + 1; // TODO: Prevent account overflow } - done = ledger.store.account.begin (transaction, next) == end; + done = (it == end); } - lock.lock (); + stats.add (nano::stat::type::backlog_scan, nano::stat::detail::scanned, scanned.size ()); + stats.add (nano::stat::type::backlog_scan, nano::stat::detail::activated, activated.size ()); - // Give the rest of the node time to progress without holding database lock - condition.wait_for (lock, std::chrono::milliseconds{ 1000 / config.frequency }); + // Notify about scanned and activated accounts without holding database transaction + batch_scanned.notify (scanned); + batch_activated.notify (activated); + + lock.lock (); } } -void nano::backlog_scan::activate (secure::transaction const & transaction, nano::account const & account, nano::account_info const & account_info) +nano::container_info nano::backlog_scan::container_info () const { - auto const maybe_conf_info = ledger.store.confirmation_height.get (transaction, account); - auto const conf_info = maybe_conf_info.value_or (nano::confirmation_height_info{}); - - activated_info info{ account, account_info, conf_info }; - - stats.inc (nano::stat::type::backlog_scan, nano::stat::detail::scanned); - scanned.notify (transaction, info); - - // If conf info is empty then it means then it means nothing is confirmed yet - if (conf_info.height < account_info.block_count) - { - stats.inc (nano::stat::type::backlog_scan, nano::stat::detail::activated); - activated.notify (transaction, info); - } + nano::lock_guard guard{ mutex }; + nano::container_info info; + info.put ("limiter", limiter.size ()); + return info; } /* @@ -149,7 +159,7 @@ nano::error nano::backlog_scan_config::serialize (nano::tomlconfig & toml) const { toml.put ("enable", enable, "Control if ongoing backlog population is enabled. If not, backlog population can still be triggered by RPC \ntype:bool"); toml.put ("batch_size", batch_size, "Number of accounts per second to process when doing backlog population scan. Increasing this value will help unconfirmed frontiers get into election prioritization queue faster, however it will also increase resource usage. \ntype:uint"); - toml.put ("frequency", frequency, "Backlog scan divides the scan into smaller batches, number of which is controlled by this value. Higher frequency helps to utilize resources more uniformly, however it also introduces more overhead. The resulting number of accounts per single batch is `backlog_scan_batch_size / backlog_scan_frequency` \ntype:uint"); + toml.put ("frequency", frequency, "Number of batches to process per second. Higher frequency and smaller batch size helps to utilize resources more uniformly, however it also introduces more overhead. Use 0 to process as fast as possible, but be aware that it may consume a lot of resources. \ntype:uint"); return toml.get_error (); } diff --git a/nano/node/backlog_scan.hpp b/nano/node/backlog_scan.hpp index 2771ab7450..9f65231500 100644 --- a/nano/node/backlog_scan.hpp +++ b/nano/node/backlog_scan.hpp @@ -3,11 +3,13 @@ #include #include #include +#include #include #include #include #include +#include #include namespace nano @@ -21,10 +23,10 @@ class backlog_scan_config final public: /** Control if ongoing backlog population is enabled. If not, backlog population can still be triggered by RPC */ bool enable{ true }; - /** Number of accounts per second to process. Number of accounts per single batch is this value divided by `frequency` */ - unsigned batch_size{ 10 * 1000 }; - /** Number of batches to run per second. Batches run in 1 second / `frequency` intervals */ - unsigned frequency{ 10 }; + /** Number of accounts per second to process. */ + size_t batch_size{ 1000 }; + /** Number of batches to run per second. */ + size_t frequency{ 10 }; }; class backlog_scan final @@ -42,6 +44,8 @@ class backlog_scan final /** Notify about AEC vacancy */ void notify (); + nano::container_info container_info () const; + public: struct activated_info { @@ -50,12 +54,9 @@ class backlog_scan final nano::confirmation_height_info conf_info; }; - /** - * Callback called for each backlogged account - */ - using callback_t = nano::observer_set; - callback_t activated; - callback_t scanned; + using batch_event_t = nano::observer_set>; + batch_event_t batch_scanned; // Accounts scanned but not activated + batch_event_t batch_activated; // Accounts activated private: // Dependencies backlog_scan_config const & config; @@ -66,9 +67,10 @@ class backlog_scan final void run (); bool predicate () const; void populate_backlog (nano::unique_lock & lock); - void activate (secure::transaction const &, nano::account const &, nano::account_info const &); private: + nano::rate_limiter limiter; + /** This is a manual trigger, the ongoing backlog population does not use this. * It can be triggered even when backlog population (frontiers confirmation) is disabled. */ bool triggered{ false }; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 4ef0323929..32ddf699bd 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -180,9 +180,14 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy return ledger.weight (rep); }; - backlog_scan.activated.add ([this] (nano::secure::transaction const & transaction, auto const & info) { - scheduler.optimistic.activate (info.account, info.account_info, info.conf_info); - scheduler.priority.activate (transaction, info.account, info.account_info, info.conf_info); + // TODO: Hook this direclty in the schedulers + backlog_scan.batch_activated.add ([this] (auto const & batch) { + auto transaction = ledger.tx_begin_read (); + for (auto const & info : batch) + { + scheduler.optimistic.activate (info.account, info.account_info, info.conf_info); + scheduler.priority.activate (transaction, info.account, info.account_info, info.conf_info); + } }); // Republish vote if it is new and the node does not host a principal representative (or close to) @@ -1198,6 +1203,7 @@ nano::container_info nano::node::container_info () const info.add ("rep_tiers", rep_tiers.container_info ()); info.add ("message_processor", message_processor.container_info ()); info.add ("bandwidth", outbound_limiter.container_info ()); + info.add ("backlog_scan", backlog_scan.container_info ()); return info; } diff --git a/nano/node/scheduler/priority.cpp b/nano/node/scheduler/priority.cpp index 6bc9091e64..4f63b43b9e 100644 --- a/nano/node/scheduler/priority.cpp +++ b/nano/node/scheduler/priority.cpp @@ -135,9 +135,12 @@ bool nano::scheduler::priority::activate (secure::transaction const & transactio { debug_assert (conf_info.frontier != account_info.head); - auto hash = conf_info.height == 0 ? account_info.open_block : ledger.any.block_successor (transaction, conf_info.frontier).value (); - auto block = ledger.any.block_get (transaction, hash); - release_assert (block != nullptr); + auto const hash = conf_info.height == 0 ? account_info.open_block : ledger.any.block_successor (transaction, conf_info.frontier).value_or (0); + auto const block = ledger.any.block_get (transaction, hash); + if (!block) + { + return false; // Not activated + } if (ledger.dependents_confirmed (transaction, *block)) {