Skip to content

Commit

Permalink
More efficient database scan iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Sep 24, 2024
1 parent 5bf11b6 commit 3d6e181
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 113 deletions.
189 changes: 113 additions & 76 deletions nano/node/bootstrap_ascending/database_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,121 +8,158 @@
#include <nano/store/pending.hpp>

/*
* database_iterator
* database_scan
*/

nano::bootstrap_ascending::database_iterator::database_iterator (nano::ledger & ledger, table_type table_a) :
ledger{ ledger },
table{ table_a }
nano::bootstrap_ascending::database_scan::database_scan (nano::ledger & ledger_a) :
ledger{ ledger_a },
accounts_iterator{ ledger },
pending_iterator{ ledger }
{
}

nano::account nano::bootstrap_ascending::database_iterator::operator* () const
nano::account nano::bootstrap_ascending::database_scan::next (std::function<bool (nano::account const &)> const & filter)
{
return current;
}
if (queue.empty ())
{
fill ();
}

void nano::bootstrap_ascending::database_iterator::next (secure::transaction & tx)
{
switch (table)
while (!queue.empty ())
{
case table_type::account:
{
auto item = ledger.store.account.begin (tx, current.number () + 1);
if (item != ledger.store.account.end ())
{
current = item->first;
}
else
{
current = { 0 };
}
break;
}
case table_type::pending:
auto result = queue.front ();
queue.pop_front ();

if (filter (result))
{
auto item = ledger.any.receivable_upper_bound (tx, current);
if (item != ledger.any.receivable_end ())
{
current = item->first.account;
}
else
{
current = { 0 };
}
break;
return result;
}
}

return { 0 };
}

/*
* buffered_iterator
*/
void nano::bootstrap_ascending::database_scan::fill ()
{
auto transaction = ledger.store.tx_begin_read ();

auto set1 = accounts_iterator.next_batch (transaction, batch_size);
auto set2 = pending_iterator.next_batch (transaction, batch_size);

queue.insert (queue.end (), set1.begin (), set1.end ());
queue.insert (queue.end (), set2.begin (), set2.end ());
}

bool nano::bootstrap_ascending::database_scan::warmed_up () const
{
return accounts_iterator.warmed_up () && pending_iterator.warmed_up ();
}

nano::bootstrap_ascending::buffered_iterator::buffered_iterator (nano::ledger & ledger) :
ledger{ ledger },
accounts_iterator{ ledger, database_iterator::table_type::account },
pending_iterator{ ledger, database_iterator::table_type::pending }
std::unique_ptr<nano::container_info_component> nano::bootstrap_ascending::database_scan::collect_container_info (std::string const & name) const
{
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "accounts_iterator", accounts_iterator.completed, 0 }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "pending_iterator", pending_iterator.completed, 0 }));
return composite;
}

nano::account nano::bootstrap_ascending::buffered_iterator::operator* () const
/*
* account_database_iterator
*/

nano::bootstrap_ascending::account_database_iterator::account_database_iterator (nano::ledger & ledger_a) :
ledger{ ledger_a }
{
return !buffer.empty () ? buffer.front () : nano::account{ 0 };
}

nano::account nano::bootstrap_ascending::buffered_iterator::next (std::function<bool (nano::account const &)> const & filter)
std::deque<nano::account> nano::bootstrap_ascending::account_database_iterator::next_batch (nano::store::transaction & transaction, size_t batch_size)
{
if (buffer.empty ())
std::deque<nano::account> result;

auto it = ledger.store.account.begin (transaction, next);
auto const end = ledger.store.account.end ();

for (size_t count = 0; it != end && count < batch_size; ++it, ++count)
{
fill ();
auto const & account = it->first;
result.push_back (account);
next = account.number () + 1;
}

while (!buffer.empty ())
if (it == end)
{
auto result = buffer.front ();
buffer.pop_front ();

if (filter (result))
{
return result;
}
// Reset for the next ledger iteration
next = { 0 };
++completed;
}

return { 0 };
return result;
}

bool nano::bootstrap_ascending::buffered_iterator::warmup () const
bool nano::bootstrap_ascending::account_database_iterator::warmed_up () const
{
return warmup_m;
return completed > 0;
}

void nano::bootstrap_ascending::buffered_iterator::fill ()
/*
* pending_database_iterator
*/

nano::bootstrap_ascending::pending_database_iterator::pending_database_iterator (nano::ledger & ledger_a) :
ledger{ ledger_a }
{
debug_assert (buffer.empty ());
}

// Fill half from accounts table and half from pending table
auto transaction = ledger.tx_begin_read ();
std::deque<nano::account> nano::bootstrap_ascending::pending_database_iterator::next_batch (nano::store::transaction & transaction, size_t batch_size)
{
std::deque<nano::account> result;

for (int n = 0; n < size / 2; ++n)
{
accounts_iterator.next (transaction);
if (!(*accounts_iterator).is_zero ())
{
buffer.push_back (*accounts_iterator);
}
}
auto it = ledger.store.pending.begin (transaction, next);
auto const end = ledger.store.pending.end ();

for (int n = 0; n < size / 2; ++n)
{
pending_iterator.next (transaction);
if (!(*pending_iterator).is_zero ())
// TODO: This pending iteration heuristic should be encapsulated in a pending_iterator class and reused across other components
auto advance_iterator = [&] () {
auto const starting_account = it->first.account;

// For RocksDB, sequential access is ~10x faster than performing a fresh lookup (tested on my machine)
const size_t sequential_attempts = 10;

// First try advancing sequentially
for (size_t count = 0; count < sequential_attempts && it != end; ++count, ++it)
{
buffer.push_back (*pending_iterator);
if (it->first.account != starting_account)
{
break;
}
}
else

// If we didn't advance to the next account, perform a fresh lookup
if (it != end && it->first.account != starting_account)
{
warmup_m = false;
it = ledger.store.pending.begin (transaction, { starting_account.number () + 1, 0 });
}

debug_assert (it == end || it->first.account != starting_account);
};

for (size_t count = 0; it != end && count < batch_size; advance_iterator (), ++count)
{
auto const & account = it->first.account;
result.push_back (account);
next = { account.number () + 1, 0 };
}

if (it == end)
{
// Reset for the next ledger iteration
next = { 0, 0 };
++completed;
}

return result;
}

bool nano::bootstrap_ascending::pending_database_iterator::warmed_up () const
{
return completed > 0;
}
65 changes: 32 additions & 33 deletions nano/node/bootstrap_ascending/database_scan.hpp
Original file line number Diff line number Diff line change
@@ -1,62 +1,61 @@
#pragma once

#include <nano/lib/numbers.hpp>
#include <nano/node/fwd.hpp>
#include <nano/secure/pending_info.hpp>

#include <deque>

namespace nano
namespace nano::bootstrap_ascending
{
class ledger;
}

namespace nano::secure
struct account_database_iterator
{
class transaction;
}
explicit account_database_iterator (nano::ledger &);

namespace nano::bootstrap_ascending
{
class database_iterator
std::deque<nano::account> next_batch (nano::store::transaction &, size_t batch_size);
bool warmed_up () const;

nano::ledger & ledger;
nano::account next{ 0 };
size_t completed{ 0 };
};

struct pending_database_iterator
{
public:
enum class table_type
{
account,
pending
};
explicit pending_database_iterator (nano::ledger &);

explicit database_iterator (nano::ledger & ledger, table_type);
nano::account operator* () const;
void next (secure::transaction & tx);
std::deque<nano::account> next_batch (nano::store::transaction &, size_t batch_size);
bool warmed_up () const;

private:
nano::ledger & ledger;
nano::account current{ 0 };
const table_type table;
nano::pending_key next{ 0, 0 };
size_t completed{ 0 };
};

class buffered_iterator
class database_scan
{
public:
explicit buffered_iterator (nano::ledger & ledger);
explicit database_scan (nano::ledger &);

nano::account operator* () const;
nano::account next (std::function<bool (nano::account const &)> const & filter);

// Indicates if a full ledger iteration has taken place e.g. warmed up
bool warmup () const;
bool warmed_up () const;

std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name) const;

private: // Dependencies
nano::ledger & ledger;

private:
void fill ();

private:
nano::ledger & ledger;
std::deque<nano::account> buffer;
bool warmup_m{ true };
account_database_iterator accounts_iterator;
pending_database_iterator pending_iterator;

database_iterator accounts_iterator;
database_iterator pending_iterator;
std::deque<nano::account> queue;

static std::size_t constexpr size = 1024;
static size_t constexpr batch_size = 128;
};
} // nano::bootstrap_ascending
}
7 changes: 4 additions & 3 deletions nano/node/bootstrap_ascending/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ nano::bootstrap_ascending::service::service (nano::node_config const & node_conf
stats{ stat_a },
logger{ logger_a },
accounts{ config.account_sets, stats },
iterator{ ledger },
database_scan{ ledger },
throttle{ compute_throttle_size () },
scoring{ config, node_config_a.network_params.network },
database_limiter{ config.database_rate_limit, 1.0 }
Expand Down Expand Up @@ -345,7 +345,7 @@ nano::account nano::bootstrap_ascending::service::next_database (bool should_thr
return { 0 };
}

auto account = iterator.next ([this] (nano::account const & account) {
auto account = database_scan.next ([this] (nano::account const & account) {
return count_tags (account, query_source::database) == 0;
});

Expand Down Expand Up @@ -512,7 +512,7 @@ void nano::bootstrap_ascending::service::run_database ()
while (!stopped)
{
// Avoid high churn rate of database requests
bool should_throttle = !iterator.warmup () && throttle.throttled ();
bool should_throttle = !database_scan.warmed_up () && throttle.throttled ();
lock.unlock ();
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop_database);
run_one_database (should_throttle);
Expand Down Expand Up @@ -839,6 +839,7 @@ std::unique_ptr<nano::container_info_component> nano::bootstrap_ascending::servi
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "throttle", throttle.size (), 0 }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "throttle_successes", throttle.successes (), 0 }));
composite->add_component (accounts.collect_container_info ("accounts"));
composite->add_component (database_scan.collect_container_info ("database_scan"));
return composite;
}

Expand Down
2 changes: 1 addition & 1 deletion nano/node/bootstrap_ascending/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ namespace bootstrap_ascending

private:
nano::bootstrap_ascending::account_sets accounts;
nano::bootstrap_ascending::buffered_iterator iterator;
nano::bootstrap_ascending::database_scan database_scan;
nano::bootstrap_ascending::throttle throttle;
nano::bootstrap_ascending::peer_scoring scoring;

Expand Down
3 changes: 3 additions & 0 deletions nano/store/pending.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ namespace nano::store
*/
class pending
{
public:
using iterator = store::iterator<nano::pending_key, nano::pending_info>;

public:
virtual void put (store::write_transaction const &, nano::pending_key const &, nano::pending_info const &) = 0;
virtual void del (store::write_transaction const &, nano::pending_key const &) = 0;
Expand Down

0 comments on commit 3d6e181

Please sign in to comment.