diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index f3a91472c9..b386932ef9 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -83,30 +83,6 @@ void nano::bootstrap_ascending::service::stop () nano::join_or_pass (timeout_thread); } -void nano::bootstrap_ascending::service::send (std::shared_ptr channel, async_tag tag) -{ - debug_assert (tag.type == async_tag::query_type::blocks_by_hash || tag.type == async_tag::query_type::blocks_by_account); - - nano::asc_pull_req request{ network_consts }; - request.id = tag.id; - request.type = nano::asc_pull_type::blocks; - - nano::asc_pull_req::blocks_payload request_payload; - request_payload.start = tag.start; - request_payload.count = config.bootstrap_ascending.pull_count; - request_payload.start_type = (tag.type == async_tag::query_type::blocks_by_hash) ? nano::asc_pull_req::hash_type::block : nano::asc_pull_req::hash_type::account; - - request.payload = request_payload; - request.update_header (); - - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out); - - // TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests - channel->send ( - request, nullptr, - nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap); -} - std::size_t nano::bootstrap_ascending::service::priority_size () const { nano::lock_guard lock{ mutex }; @@ -131,8 +107,9 @@ std::size_t nano::bootstrap_ascending::service::score_size () const */ void nano::bootstrap_ascending::service::inspect (store::transaction const & tx, nano::process_return const & result, nano::block const & block) { - auto const hash = block.hash (); + debug_assert (!mutex.try_lock ()); + auto const hash = block.hash (); switch (result.code) { case nano::process_result::progress: @@ -259,30 +236,42 @@ nano::account nano::bootstrap_ascending::service::wait_available_account () return { 0 }; } -bool nano::bootstrap_ascending::service::request (nano::account & account, std::shared_ptr & channel) +auto nano::bootstrap_ascending::service::wait_next () -> std::optional { - async_tag tag{}; - tag.id = nano::bootstrap_ascending::generate_id (); - tag.account = account; - tag.time = std::chrono::steady_clock::now (); - - // Check if the account picked has blocks, if it does, start the pull from the highest block - auto info = ledger.store.account.get (ledger.store.tx_begin_read (), account); - if (info) - { - tag.type = async_tag::query_type::blocks_by_hash; - tag.start = info->head; - } - else + // Waits for account either from priority queue or database + auto account = wait_available_account (); + if (account.is_zero ()) { - tag.type = async_tag::query_type::blocks_by_account; - tag.start = account; + return {}; } + account_scan_strategy strategy{ {}, account }; + return strategy; +} + +bool nano::bootstrap_ascending::service::request (const strategy_variant & strategy, std::shared_ptr & channel) +{ + async_tag tag{ strategy }; + tag.id = nano::bootstrap_ascending::generate_id (); + tag.time = std::chrono::steady_clock::now (); + on_request.notify (tag, channel); + nano::asc_pull_req request{ network_consts }; + request.id = tag.id; + request.type = nano::asc_pull_type::blocks; + + request.payload = tag.prepare_request (*this); + request.update_header (); + track (tag); - send (channel, tag); + + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out); + + // TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests + channel->send ( + request, nullptr, + nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap); return true; // Request sent } @@ -292,21 +281,18 @@ bool nano::bootstrap_ascending::service::run_one () // Ensure there is enough space in blockprocessor for queuing new blocks wait_blockprocessor (); - // Waits for account either from priority queue or database - auto account = wait_available_account (); - if (account.is_zero ()) - { - return false; - } - // Waits for channel that is not full auto channel = wait_available_channel (); if (!channel) { return false; } - - bool success = request (account, channel); + auto strategy = wait_next (); + if (!strategy) + { + return false; + } + bool success = request (*strategy, channel); return success; } @@ -355,10 +341,39 @@ void nano::bootstrap_ascending::service::run_timeouts () on_timeout.notify (tag); stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::timeout); } + condition.wait_for (lock, 1s, [this] () { return stopped; }); } } +nano::asc_pull_req::payload_variant nano::bootstrap_ascending::service::prepare (nano::bootstrap_ascending::service::account_scan_strategy & strategy) +{ + nano::asc_pull_req::blocks_payload request; + request.count = config.bootstrap_ascending.pull_count; + + // Check if the account picked has blocks, if it does, start the pull from the highest block + auto info = ledger.store.account.get (ledger.store.tx_begin_read (), strategy.account); + if (info) + { + strategy.type = account_scan_strategy::query_type::blocks_by_hash; + strategy.start = request.start = info->head; + request.start_type = nano::asc_pull_req::hash_type::block; + } + else + { + strategy.type = account_scan_strategy::query_type::blocks_by_account; + strategy.start = request.start = strategy.account; + request.start_type = nano::asc_pull_req::hash_type::account; + } + return request; +} + +nano::asc_pull_req::payload_variant nano::bootstrap_ascending::service::prepare (nano::bootstrap_ascending::service::lazy_bootstrap_strategy & strategy) +{ + nano::asc_pull_req::account_info_payload request; + return request; +} + void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & message, std::shared_ptr channel) { nano::unique_lock lock{ mutex }; @@ -376,7 +391,9 @@ void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & mes on_reply.notify (tag); condition.notify_all (); - std::visit ([this, &tag] (auto && request) { return process (request, tag); }, message.payload); + + // Dispatch to specialized process overload + std::visit ([this, &message] (auto && strategy) { strategy.process_response (message.payload, *this); }, tag.strategy); } else { @@ -384,14 +401,16 @@ void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & mes } } -void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::service::async_tag & tag) +void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::blocks_payload & response, const account_scan_strategy & strategy) { stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply); - auto result = verify (response, tag); + auto result = strategy.verify (response); switch (result) { - case verify_result::ok: + using enum account_scan_strategy::verify_result; + + case ok: { stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in, response.blocks.size ()); @@ -399,20 +418,21 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc { block_processor.add (block, nano::block_processor::block_source::bootstrap); } + nano::lock_guard lock{ mutex }; throttle.add (true); } break; - case verify_result::nothing_new: + case nothing_new: { stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::nothing_new); nano::lock_guard lock{ mutex }; - accounts.priority_down (tag.account); + accounts.priority_down (strategy.account); throttle.add (false); } break; - case verify_result::invalid: + case invalid: { stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::invalid); // TODO: Log @@ -421,18 +441,13 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc } } -void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::account_info_payload & response, const nano::bootstrap_ascending::service::async_tag & tag) -{ - // TODO: Make use of account info -} - -void nano::bootstrap_ascending::service::process (const nano::empty_payload & response, const nano::bootstrap_ascending::service::async_tag & tag) -{ - // Should not happen - debug_assert (false, "empty payload"); -} - -nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::service::verify (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::service::async_tag & tag) const +/** + * Verifies whether the received response is valid. Returns: + * - invalid: when received blocks do not correspond to requested hash/account or they do not make a valid chain + * - nothing_new: when received response indicates that the account chain does not have more blocks + * - ok: otherwise, if all checks pass + */ +auto nano::bootstrap_ascending::service::account_scan_strategy::verify (const nano::asc_pull_ack::blocks_payload & response) const -> verify_result { auto const & blocks = response.blocks; @@ -440,27 +455,27 @@ nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::ser { return verify_result::nothing_new; } - if (blocks.size () == 1 && blocks.front ()->hash () == tag.start.as_block_hash ()) + if (blocks.size () == 1 && blocks.front ()->hash () == start.as_block_hash ()) { return verify_result::nothing_new; } auto const & first = blocks.front (); - switch (tag.type) + switch (type) { - case async_tag::query_type::blocks_by_hash: + case query_type::blocks_by_hash: { - if (first->hash () != tag.start.as_block_hash ()) + if (first->hash () != start.as_block_hash ()) { // TODO: Stat & log return verify_result::invalid; } } break; - case async_tag::query_type::blocks_by_account: + case query_type::blocks_by_account: { // Open & state blocks always contain account field - if (first->account () != tag.start.as_account ()) + if (first->account () != start.as_account ()) { // TODO: Stat & log return verify_result::invalid; @@ -468,6 +483,7 @@ nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::ser } break; default: + debug_assert (false, "invalid type"); return verify_result::invalid; } @@ -487,6 +503,11 @@ nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::ser return verify_result::ok; } +void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::account_info_payload & response, const lazy_bootstrap_strategy & strategy) +{ + // TODO: Make use of account info +} + void nano::bootstrap_ascending::service::track (async_tag const & tag) { stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::track); diff --git a/nano/node/bootstrap_ascending/service.hpp b/nano/node/bootstrap_ascending/service.hpp index 686003915b..438d90befb 100644 --- a/nano/node/bootstrap_ascending/service.hpp +++ b/nano/node/bootstrap_ascending/service.hpp @@ -64,22 +64,76 @@ class service nano::network & network; nano::stats & stats; -public: // async_tag - struct async_tag +public: // Strategies + class async_tag; + + template + class base_strategy { + public: + nano::asc_pull_req::payload_variant prepare (service & service) + { + return service.prepare (*static_cast (this)); + } + + void process_response (nano::asc_pull_ack::payload_variant const & response, service & service) + { + std::visit ([&] (auto const & payload) { process (payload, service); }, response); + } + + void process (Response const & response, service & service) + { + service.process (response, *static_cast (this)); + } + + // Fallback + void process (auto const & response, service & service) + { + nano::asc_pull_ack::payload_variant{ response }; // Force compilation error if response is not part of variant + debug_assert (false, "invalid payload"); + } + }; + + class account_scan_strategy : public base_strategy + { + public: enum class query_type { - invalid = 0, // Default initialization blocks_by_hash, blocks_by_account, - // TODO: account_info, }; - query_type type{ query_type::invalid }; + const nano::account account{}; + nano::hash_or_account start{}; + query_type type{}; + + enum class verify_result + { + ok, + nothing_new, + invalid, + }; + + verify_result verify (nano::asc_pull_ack::blocks_payload const & response) const; + }; + + class lazy_bootstrap_strategy : public base_strategy + { + }; + + using strategy_variant = std::variant; + +public: + struct async_tag + { + strategy_variant strategy; nano::bootstrap_ascending::id_t id{ 0 }; - nano::hash_or_account start{ 0 }; std::chrono::steady_clock::time_point time{}; - nano::account account{ 0 }; + + nano::asc_pull_req::payload_variant prepare_request (service & service) + { + return std::visit ([&] (auto && sgy) { return sgy.prepare (service); }, strategy); + } }; public: // Events @@ -96,6 +150,11 @@ class service bool run_one (); void run_timeouts (); + bool request (strategy_variant const &, std::shared_ptr &); + void track (async_tag const & tag); + + std::optional wait_next (); + /* Throttles requesting new blocks, not to overwhelm blockprocessor */ void wait_blockprocessor (); /* Waits for channel with free capacity for bootstrap messages */ @@ -104,28 +163,11 @@ class service nano::account available_account (); nano::account wait_available_account (); - bool request (nano::account &, std::shared_ptr &); - void send (std::shared_ptr, async_tag tag); - void track (async_tag const & tag); - - void process (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag); - void process (nano::asc_pull_ack::account_info_payload const & response, async_tag const & tag); - void process (nano::empty_payload const & response, async_tag const & tag); + nano::asc_pull_req::payload_variant prepare (account_scan_strategy &); + nano::asc_pull_req::payload_variant prepare (lazy_bootstrap_strategy &); - enum class verify_result - { - ok, - nothing_new, - invalid, - }; - - /** - * Verifies whether the received response is valid. Returns: - * - invalid: when received blocks do not correspond to requested hash/account or they do not make a valid chain - * - nothing_new: when received response indicates that the account chain does not have more blocks - * - ok: otherwise, if all checks pass - */ - verify_result verify (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag) const; + void process (nano::asc_pull_ack::blocks_payload const & response, account_scan_strategy const &); + void process (nano::asc_pull_ack::account_info_payload const & response, lazy_bootstrap_strategy const &); public: // account_sets nano::bootstrap_ascending::account_sets::info_t info () const; @@ -134,6 +176,7 @@ class service nano::bootstrap_ascending::account_sets accounts; nano::bootstrap_ascending::buffered_iterator iterator; nano::bootstrap_ascending::throttle throttle; + // Calculates a lookback size based on the size of the ledger where larger ledgers have a larger sample count std::size_t compute_throttle_size () const; @@ -146,9 +189,7 @@ class service mi::indexed_by< mi::sequenced>, mi::hashed_unique, - mi::member>, - mi::hashed_non_unique, - mi::member> + mi::member> >>; // clang-format on ordered_tags tags; diff --git a/nano/node/messages.hpp b/nano/node/messages.hpp index bf4ab2b2ab..8a138ad9d8 100644 --- a/nano/node/messages.hpp +++ b/nano/node/messages.hpp @@ -468,12 +468,11 @@ class asc_pull_req final : public message }; public: // Payload - /** Currently unused, allows extensions in the future */ asc_pull_type type{ asc_pull_type::invalid }; id_t id{ 0 }; - /** Payload depends on `asc_pull_type` */ - std::variant payload; + using payload_variant = std::variant; + payload_variant payload; public: /** Size of message without payload */ @@ -544,12 +543,11 @@ class asc_pull_ack final : public message }; public: // Payload - /** Currently unused, allows extensions in the future */ asc_pull_type type{ asc_pull_type::invalid }; id_t id{ 0 }; - /** Payload depends on `asc_pull_type` */ - std::variant payload; + using payload_variant = std::variant; + payload_variant payload; public: /** Size of message without payload */ diff --git a/nano/slow_test/bootstrap.cpp b/nano/slow_test/bootstrap.cpp index b722cf73e1..250bc4de8e 100644 --- a/nano/slow_test/bootstrap.cpp +++ b/nano/slow_test/bootstrap.cpp @@ -136,11 +136,11 @@ TEST (bootstrap_ascending, profile) } }); - client->ascendboot.on_request.add ([&] (auto & tag, auto & channel) { - nano::lock_guard lock{ mutex }; - - requests[tag.id] = { tag, channel }; - }); + // client->ascendboot.on_request.add ([&] (auto & tag, auto & channel) { + // nano::lock_guard lock{ mutex }; + // + // requests[tag.id] = { tag, channel }; + // }); client->ascendboot.on_reply.add ([&] (auto & tag) { nano::lock_guard lock{ mutex };