diff --git a/nano/node/bootstrap_ascending/database_scan.cpp b/nano/node/bootstrap_ascending/database_scan.cpp index ecd1c2c45c..840879cb10 100644 --- a/nano/node/bootstrap_ascending/database_scan.cpp +++ b/nano/node/bootstrap_ascending/database_scan.cpp @@ -8,121 +8,158 @@ #include /* - * database_iterator + * database_scan */ -nano::bootstrap_ascending::database_iterator::database_iterator (nano::ledger & ledger, table_type table_a) : - ledger{ ledger }, - table{ table_a } +nano::bootstrap_ascending::database_scan::database_scan (nano::ledger & ledger_a) : + ledger{ ledger_a }, + accounts_iterator{ ledger }, + pending_iterator{ ledger } { } -nano::account nano::bootstrap_ascending::database_iterator::operator* () const +nano::account nano::bootstrap_ascending::database_scan::next (std::function const & filter) { - return current; -} + if (queue.empty ()) + { + fill (); + } -void nano::bootstrap_ascending::database_iterator::next (secure::transaction & tx) -{ - switch (table) + while (!queue.empty ()) { - case table_type::account: - { - auto item = ledger.store.account.begin (tx, current.number () + 1); - if (item != ledger.store.account.end ()) - { - current = item->first; - } - else - { - current = { 0 }; - } - break; - } - case table_type::pending: + auto result = queue.front (); + queue.pop_front (); + + if (filter (result)) { - auto item = ledger.any.receivable_upper_bound (tx, current); - if (item != ledger.any.receivable_end ()) - { - current = item->first.account; - } - else - { - current = { 0 }; - } - break; + return result; } } + + return { 0 }; } -/* - * buffered_iterator - */ +void nano::bootstrap_ascending::database_scan::fill () +{ + auto transaction = ledger.store.tx_begin_read (); + + auto set1 = accounts_iterator.next_batch (transaction, batch_size); + auto set2 = pending_iterator.next_batch (transaction, batch_size); + + queue.insert (queue.end (), set1.begin (), set1.end ()); + queue.insert (queue.end (), set2.begin (), set2.end ()); +} + +bool nano::bootstrap_ascending::database_scan::warmed_up () const +{ + return accounts_iterator.warmed_up () && pending_iterator.warmed_up (); +} -nano::bootstrap_ascending::buffered_iterator::buffered_iterator (nano::ledger & ledger) : - ledger{ ledger }, - accounts_iterator{ ledger, database_iterator::table_type::account }, - pending_iterator{ ledger, database_iterator::table_type::pending } +std::unique_ptr nano::bootstrap_ascending::database_scan::collect_container_info (std::string const & name) const { + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "accounts_iterator", accounts_iterator.completed, 0 })); + composite->add_component (std::make_unique (container_info{ "pending_iterator", pending_iterator.completed, 0 })); + return composite; } -nano::account nano::bootstrap_ascending::buffered_iterator::operator* () const +/* + * account_database_iterator + */ + +nano::bootstrap_ascending::account_database_iterator::account_database_iterator (nano::ledger & ledger_a) : + ledger{ ledger_a } { - return !buffer.empty () ? buffer.front () : nano::account{ 0 }; } -nano::account nano::bootstrap_ascending::buffered_iterator::next (std::function const & filter) +std::deque nano::bootstrap_ascending::account_database_iterator::next_batch (nano::store::transaction & transaction, size_t batch_size) { - if (buffer.empty ()) + std::deque result; + + auto it = ledger.store.account.begin (transaction, next); + auto const end = ledger.store.account.end (); + + for (size_t count = 0; it != end && count < batch_size; ++it, ++count) { - fill (); + auto const & account = it->first; + result.push_back (account); + next = account.number () + 1; } - while (!buffer.empty ()) + if (it == end) { - auto result = buffer.front (); - buffer.pop_front (); - - if (filter (result)) - { - return result; - } + // Reset for the next ledger iteration + next = { 0 }; + ++completed; } - return { 0 }; + return result; } -bool nano::bootstrap_ascending::buffered_iterator::warmup () const +bool nano::bootstrap_ascending::account_database_iterator::warmed_up () const { - return warmup_m; + return completed > 0; } -void nano::bootstrap_ascending::buffered_iterator::fill () +/* + * pending_database_iterator + */ + +nano::bootstrap_ascending::pending_database_iterator::pending_database_iterator (nano::ledger & ledger_a) : + ledger{ ledger_a } { - debug_assert (buffer.empty ()); +} - // Fill half from accounts table and half from pending table - auto transaction = ledger.tx_begin_read (); +std::deque nano::bootstrap_ascending::pending_database_iterator::next_batch (nano::store::transaction & transaction, size_t batch_size) +{ + std::deque result; - for (int n = 0; n < size / 2; ++n) - { - accounts_iterator.next (transaction); - if (!(*accounts_iterator).is_zero ()) - { - buffer.push_back (*accounts_iterator); - } - } + auto it = ledger.store.pending.begin (transaction, next); + auto const end = ledger.store.pending.end (); - for (int n = 0; n < size / 2; ++n) - { - pending_iterator.next (transaction); - if (!(*pending_iterator).is_zero ()) + // TODO: This pending iteration heuristic should be encapsulated in a pending_iterator class and reused across other components + auto advance_iterator = [&] () { + auto const starting_account = it->first.account; + + // For RocksDB, sequential access is ~10x faster than performing a fresh lookup (tested on my machine) + const size_t sequential_attempts = 10; + + // First try advancing sequentially + for (size_t count = 0; count < sequential_attempts && it != end; ++count, ++it) { - buffer.push_back (*pending_iterator); + if (it->first.account != starting_account) + { + break; + } } - else + + // If we didn't advance to the next account, perform a fresh lookup + if (it != end && it->first.account != starting_account) { - warmup_m = false; + it = ledger.store.pending.begin (transaction, { starting_account.number () + 1, 0 }); } + + debug_assert (it == end || it->first.account != starting_account); + }; + + for (size_t count = 0; it != end && count < batch_size; advance_iterator (), ++count) + { + auto const & account = it->first.account; + result.push_back (account); + next = { account.number () + 1, 0 }; + } + + if (it == end) + { + // Reset for the next ledger iteration + next = { 0, 0 }; + ++completed; } + + return result; } + +bool nano::bootstrap_ascending::pending_database_iterator::warmed_up () const +{ + return completed > 0; +} \ No newline at end of file diff --git a/nano/node/bootstrap_ascending/database_scan.hpp b/nano/node/bootstrap_ascending/database_scan.hpp index e5404098ef..f644d71e5f 100644 --- a/nano/node/bootstrap_ascending/database_scan.hpp +++ b/nano/node/bootstrap_ascending/database_scan.hpp @@ -1,62 +1,61 @@ #pragma once #include +#include +#include #include -namespace nano +namespace nano::bootstrap_ascending { -class ledger; -} - -namespace nano::secure +struct account_database_iterator { -class transaction; -} + explicit account_database_iterator (nano::ledger &); -namespace nano::bootstrap_ascending -{ -class database_iterator + std::deque next_batch (nano::store::transaction &, size_t batch_size); + bool warmed_up () const; + + nano::ledger & ledger; + nano::account next{ 0 }; + size_t completed{ 0 }; +}; + +struct pending_database_iterator { -public: - enum class table_type - { - account, - pending - }; + explicit pending_database_iterator (nano::ledger &); - explicit database_iterator (nano::ledger & ledger, table_type); - nano::account operator* () const; - void next (secure::transaction & tx); + std::deque next_batch (nano::store::transaction &, size_t batch_size); + bool warmed_up () const; -private: nano::ledger & ledger; - nano::account current{ 0 }; - const table_type table; + nano::pending_key next{ 0, 0 }; + size_t completed{ 0 }; }; -class buffered_iterator +class database_scan { public: - explicit buffered_iterator (nano::ledger & ledger); + explicit database_scan (nano::ledger &); - nano::account operator* () const; nano::account next (std::function const & filter); // Indicates if a full ledger iteration has taken place e.g. warmed up - bool warmup () const; + bool warmed_up () const; + + std::unique_ptr collect_container_info (std::string const & name) const; + +private: // Dependencies + nano::ledger & ledger; private: void fill (); private: - nano::ledger & ledger; - std::deque buffer; - bool warmup_m{ true }; + account_database_iterator accounts_iterator; + pending_database_iterator pending_iterator; - database_iterator accounts_iterator; - database_iterator pending_iterator; + std::deque queue; - static std::size_t constexpr size = 1024; + static size_t constexpr batch_size = 128; }; -} // nano::bootstrap_ascending +} diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index c82e91c51e..450767f032 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -28,7 +28,7 @@ nano::bootstrap_ascending::service::service (nano::node_config const & node_conf stats{ stat_a }, logger{ logger_a }, accounts{ config.account_sets, stats }, - iterator{ ledger }, + database_scan{ ledger }, throttle{ compute_throttle_size () }, scoring{ config, node_config_a.network_params.network }, database_limiter{ config.database_rate_limit, 1.0 } @@ -345,7 +345,7 @@ nano::account nano::bootstrap_ascending::service::next_database (bool should_thr return { 0 }; } - auto account = iterator.next ([this] (nano::account const & account) { + auto account = database_scan.next ([this] (nano::account const & account) { return count_tags (account, query_source::database) == 0; }); @@ -512,7 +512,7 @@ void nano::bootstrap_ascending::service::run_database () while (!stopped) { // Avoid high churn rate of database requests - bool should_throttle = !iterator.warmup () && throttle.throttled (); + bool should_throttle = !database_scan.warmed_up () && throttle.throttled (); lock.unlock (); stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop_database); run_one_database (should_throttle); @@ -839,6 +839,7 @@ std::unique_ptr nano::bootstrap_ascending::servi composite->add_component (std::make_unique (container_info{ "throttle", throttle.size (), 0 })); composite->add_component (std::make_unique (container_info{ "throttle_successes", throttle.successes (), 0 })); composite->add_component (accounts.collect_container_info ("accounts")); + composite->add_component (database_scan.collect_container_info ("database_scan")); return composite; } diff --git a/nano/node/bootstrap_ascending/service.hpp b/nano/node/bootstrap_ascending/service.hpp index 554974c85d..4f3f3668dd 100644 --- a/nano/node/bootstrap_ascending/service.hpp +++ b/nano/node/bootstrap_ascending/service.hpp @@ -155,7 +155,7 @@ namespace bootstrap_ascending private: nano::bootstrap_ascending::account_sets accounts; - nano::bootstrap_ascending::buffered_iterator iterator; + nano::bootstrap_ascending::database_scan database_scan; nano::bootstrap_ascending::throttle throttle; nano::bootstrap_ascending::peer_scoring scoring; diff --git a/nano/store/pending.hpp b/nano/store/pending.hpp index 49f2b8a31c..23272cba73 100644 --- a/nano/store/pending.hpp +++ b/nano/store/pending.hpp @@ -20,6 +20,9 @@ namespace nano::store */ class pending { +public: + using iterator = store::iterator; + public: virtual void put (store::write_transaction const &, nano::pending_key const &, nano::pending_info const &) = 0; virtual void del (store::write_transaction const &, nano::pending_key const &) = 0;