Skip to content

Commit

Permalink
Ascending bootstrap strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Nov 14, 2023
1 parent 79ffebc commit 97de2e9
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 115 deletions.
171 changes: 98 additions & 73 deletions nano/node/bootstrap_ascending/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,28 +83,8 @@ void nano::bootstrap_ascending::service::stop ()
nano::join_or_pass (timeout_thread);
}

void nano::bootstrap_ascending::service::send (std::shared_ptr<nano::transport::channel> channel, async_tag tag)
void nano::bootstrap_ascending::service::bootstrap (nano::block_hash hash)
{
debug_assert (tag.type == async_tag::query_type::blocks_by_hash || tag.type == async_tag::query_type::blocks_by_account);

nano::asc_pull_req request{ network_consts };
request.id = tag.id;
request.type = nano::asc_pull_type::blocks;

nano::asc_pull_req::blocks_payload request_payload;
request_payload.start = tag.start;
request_payload.count = config.bootstrap_ascending.pull_count;
request_payload.start_type = (tag.type == async_tag::query_type::blocks_by_hash) ? nano::asc_pull_req::hash_type::block : nano::asc_pull_req::hash_type::account;

request.payload = request_payload;
request.update_header ();

stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out);

// TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests
channel->send (
request, nullptr,
nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap);
}

std::size_t nano::bootstrap_ascending::service::priority_size () const
Expand All @@ -131,8 +111,9 @@ std::size_t nano::bootstrap_ascending::service::score_size () const
*/
void nano::bootstrap_ascending::service::inspect (store::transaction const & tx, nano::process_return const & result, nano::block const & block)
{
auto const hash = block.hash ();
debug_assert (!mutex.try_lock ());

auto const hash = block.hash ();
switch (result.code)
{
case nano::process_result::progress:
Expand Down Expand Up @@ -259,30 +240,42 @@ nano::account nano::bootstrap_ascending::service::wait_available_account ()
return { 0 };
}

bool nano::bootstrap_ascending::service::request (nano::account & account, std::shared_ptr<nano::transport::channel> & channel)
auto nano::bootstrap_ascending::service::wait_next () -> std::optional<strategy_variant>
{
async_tag tag{};
tag.id = nano::bootstrap_ascending::generate_id ();
tag.account = account;
tag.time = std::chrono::steady_clock::now ();

// Check if the account picked has blocks, if it does, start the pull from the highest block
auto info = ledger.store.account.get (ledger.store.tx_begin_read (), account);
if (info)
{
tag.type = async_tag::query_type::blocks_by_hash;
tag.start = info->head;
}
else
// Waits for account either from priority queue or database
auto account = wait_available_account ();
if (account.is_zero ())
{
tag.type = async_tag::query_type::blocks_by_account;
tag.start = account;
return {};
}

account_scan_strategy strategy{ {}, account };
return strategy;
}

bool nano::bootstrap_ascending::service::request (const strategy_variant & strategy, std::shared_ptr<nano::transport::channel> & channel)
{
async_tag tag{ strategy };
tag.id = nano::bootstrap_ascending::generate_id ();
tag.time = std::chrono::steady_clock::now ();

on_request.notify (tag, channel);

nano::asc_pull_req request{ network_consts };
request.id = tag.id;
request.type = nano::asc_pull_type::blocks;

request.payload = tag.prepare_request (*this);
request.update_header ();

track (tag);
send (channel, tag);

stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out);

// TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests
channel->send (
request, nullptr,
nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap);

return true; // Request sent
}
Expand All @@ -292,21 +285,18 @@ bool nano::bootstrap_ascending::service::run_one ()
// Ensure there is enough space in blockprocessor for queuing new blocks
wait_blockprocessor ();

// Waits for account either from priority queue or database
auto account = wait_available_account ();
if (account.is_zero ())
{
return false;
}

// Waits for channel that is not full
auto channel = wait_available_channel ();
if (!channel)
{
return false;
}

bool success = request (account, channel);
auto strategy = wait_next ();
if (!strategy)
{
return false;
}
bool success = request (*strategy, channel);
return success;
}

Expand Down Expand Up @@ -355,10 +345,39 @@ void nano::bootstrap_ascending::service::run_timeouts ()
on_timeout.notify (tag);
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::timeout);
}

condition.wait_for (lock, 1s, [this] () { return stopped; });
}
}

nano::asc_pull_req::payload_variant nano::bootstrap_ascending::service::prepare (nano::bootstrap_ascending::service::account_scan_strategy & strategy)
{
nano::asc_pull_req::blocks_payload request;
request.count = config.bootstrap_ascending.pull_count;

// Check if the account picked has blocks, if it does, start the pull from the highest block
auto info = ledger.store.account.get (ledger.store.tx_begin_read (), strategy.account);
if (info)
{
strategy.type = account_scan_strategy::query_type::blocks_by_hash;
strategy.start = request.start = info->head;
request.start_type = nano::asc_pull_req::hash_type::block;
}
else
{
strategy.type = account_scan_strategy::query_type::blocks_by_account;
strategy.start = request.start = strategy.account;
request.start_type = nano::asc_pull_req::hash_type::account;
}
return request;
}

nano::asc_pull_req::payload_variant nano::bootstrap_ascending::service::prepare (nano::bootstrap_ascending::service::lazy_bootstrap_strategy & strategy)
{
nano::asc_pull_req::account_info_payload request;
return request;
}

void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & message, std::shared_ptr<nano::transport::channel> channel)
{
nano::unique_lock<nano::mutex> lock{ mutex };
Expand All @@ -376,43 +395,48 @@ void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & mes

on_reply.notify (tag);
condition.notify_all ();
std::visit ([this, &tag] (auto && request) { return process (request, tag); }, message.payload);

// Dispatch to specialized process overload
std::visit ([this, &message] (auto && strategy) { strategy.process_response (message.payload, *this); }, tag.strategy);
}
else
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::missing_tag);
}
}

void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::service::async_tag & tag)
void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::blocks_payload & response, const account_scan_strategy & strategy)
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply);

auto result = verify (response, tag);
auto result = strategy.verify (response);
switch (result)
{
case verify_result::ok:
using enum account_scan_strategy::verify_result;

case ok:
{
stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in, response.blocks.size ());

for (auto & block : response.blocks)
{
block_processor.add (block, nano::block_processor::block_source::bootstrap);
}

nano::lock_guard<nano::mutex> lock{ mutex };
throttle.add (true);
}
break;
case verify_result::nothing_new:
case nothing_new:
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::nothing_new);

nano::lock_guard<nano::mutex> lock{ mutex };
accounts.priority_down (tag.account);
accounts.priority_down (strategy.account);
throttle.add (false);
}
break;
case verify_result::invalid:
case invalid:
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::invalid);
// TODO: Log
Expand All @@ -421,53 +445,49 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc
}
}

void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::account_info_payload & response, const nano::bootstrap_ascending::service::async_tag & tag)
{
// TODO: Make use of account info
}

void nano::bootstrap_ascending::service::process (const nano::empty_payload & response, const nano::bootstrap_ascending::service::async_tag & tag)
{
// Should not happen
debug_assert (false, "empty payload");
}

nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::service::verify (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::service::async_tag & tag) const
/**
* Verifies whether the received response is valid. Returns:
* - invalid: when received blocks do not correspond to requested hash/account or they do not make a valid chain
* - nothing_new: when received response indicates that the account chain does not have more blocks
* - ok: otherwise, if all checks pass
*/
auto nano::bootstrap_ascending::service::account_scan_strategy::verify (const nano::asc_pull_ack::blocks_payload & response) const -> verify_result
{
auto const & blocks = response.blocks;

if (blocks.empty ())
{
return verify_result::nothing_new;
}
if (blocks.size () == 1 && blocks.front ()->hash () == tag.start.as_block_hash ())
if (blocks.size () == 1 && blocks.front ()->hash () == start.as_block_hash ())
{
return verify_result::nothing_new;
}

auto const & first = blocks.front ();
switch (tag.type)
switch (type)
{
case async_tag::query_type::blocks_by_hash:
case query_type::blocks_by_hash:
{
if (first->hash () != tag.start.as_block_hash ())
if (first->hash () != start.as_block_hash ())
{
// TODO: Stat & log
return verify_result::invalid;
}
}
break;
case async_tag::query_type::blocks_by_account:
case query_type::blocks_by_account:
{
// Open & state blocks always contain account field
if (first->account () != tag.start.as_account ())
if (first->account () != start.as_account ())
{
// TODO: Stat & log
return verify_result::invalid;
}
}
break;
default:
debug_assert (false, "invalid type");
return verify_result::invalid;
}

Expand All @@ -487,6 +507,11 @@ nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::ser
return verify_result::ok;
}

void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::account_info_payload & response, const lazy_bootstrap_strategy & strategy)
{
// TODO: Make use of account info
}

void nano::bootstrap_ascending::service::track (async_tag const & tag)
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::track);
Expand Down
Loading

0 comments on commit 97de2e9

Please sign in to comment.