From e330b697bd8b313c926764b9d288630a86899e35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 31 Oct 2024 12:30:26 +0100 Subject: [PATCH] Bounded backlog --- nano/core_test/node.cpp | 21 + nano/lib/logging_enums.hpp | 1 + nano/lib/stats_enums.hpp | 9 + nano/lib/thread_roles.cpp | 9 + nano/lib/thread_roles.hpp | 3 + nano/node/CMakeLists.txt | 2 + nano/node/bootstrap/bootstrap_service.cpp | 10 + nano/node/bounded_backlog.cpp | 530 ++++++++++++++++++++++ nano/node/bounded_backlog.hpp | 155 +++++++ nano/node/fwd.hpp | 2 + nano/node/local_block_broadcaster.cpp | 6 + nano/node/local_block_broadcaster.hpp | 1 + nano/node/node.cpp | 6 + nano/node/node.hpp | 3 +- nano/node/nodeconfig.hpp | 2 + nano/secure/ledger.cpp | 12 + nano/secure/ledger.hpp | 2 + nano/store/write_queue.hpp | 1 + 18 files changed, 774 insertions(+), 1 deletion(-) create mode 100644 nano/node/bounded_backlog.cpp create mode 100644 nano/node/bounded_backlog.hpp diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 1d9118ec2c..593c5ee6c1 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -3698,3 +3699,23 @@ TEST (node, container_info) ASSERT_NO_THROW (node1.container_info ()); ASSERT_NO_THROW (node2.container_info ()); } + +TEST (node, bounded_backlog) +{ + nano::test::system system; + + nano::node_config node_config; + node_config.backlog.max_backlog = 10; + node_config.backlog.bucket_threshold = 2; + node_config.backlog_scan.enable = false; + auto & node = *system.add_node (node_config); + + const int howmany_blocks = 64; + const int howmany_chains = 16; + + auto chains = nano::test::setup_chains (system, node, howmany_chains, howmany_blocks, nano::dev::genesis_key, /* do not confirm */ false); + + node.backlog_scan.trigger (); + + ASSERT_TIMELY_EQ (20s, node.ledger.block_count (), 11); // 10 + genesis +} diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index 85136b98b7..b3f5312577 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -83,6 +83,7 @@ enum class type local_block_broadcaster, monitor, confirming_set, + bounded_backlog, // bootstrap bulk_pull_client, diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 0e104209ca..1ce8e1766e 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -85,6 +85,7 @@ enum class type active_elections_cancelled, active_elections_cemented, backlog_scan, + bounded_backlog, backlog, unchecked, election_scheduler, @@ -571,6 +572,14 @@ enum class detail blocks_by_account, account_info_by_hash, + // bounded backlog, + gathered_targets, + performing_rollbacks, + no_targets, + rollback_missing_block, + rollback_skipped, + loop_scan, + _last // Must be the last enum }; diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 89b2c5da0c..56b64f1727 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -97,6 +97,15 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::backlog_scan: thread_role_name_string = "Backlog scan"; break; + case nano::thread_role::name::bounded_backlog: + thread_role_name_string = "Bounded backlog"; + break; + case nano::thread_role::name::bounded_backlog_scan: + thread_role_name_string = "Bounded b scan"; + break; + case nano::thread_role::name::bounded_backlog_notifications: + thread_role_name_string = "Bounded b notif"; + break; case nano::thread_role::name::vote_generator_queue: thread_role_name_string = "Voting que"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 15b4a511cb..8dd7551927 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -37,6 +37,9 @@ enum class name db_parallel_traversal, unchecked, backlog_scan, + bounded_backlog, + bounded_backlog_scan, + bounded_backlog_notifications, vote_generator_queue, telemetry, bootstrap, diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 7fa1263972..13a182adff 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -24,6 +24,8 @@ add_library( blockprocessor.cpp bucketing.hpp bucketing.cpp + bounded_backlog.hpp + bounded_backlog.cpp bootstrap_weights_beta.hpp bootstrap_weights_live.hpp bootstrap/account_sets.hpp diff --git a/nano/node/bootstrap/bootstrap_service.cpp b/nano/node/bootstrap/bootstrap_service.cpp index 2a71ffb08e..e6a2295cb8 100644 --- a/nano/node/bootstrap/bootstrap_service.cpp +++ b/nano/node/bootstrap/bootstrap_service.cpp @@ -50,6 +50,16 @@ nano::bootstrap_service::bootstrap_service (nano::node_config const & node_confi condition.notify_all (); }); + // Unblock rolled back accounts as the dependency is no longer valid + block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { + nano::lock_guard lock{ mutex }; + for (auto const & block : blocks) + { + debug_assert (block != nullptr); + accounts.unblock (block->account ()); + } + }); + accounts.priority_set (node_config_a.network_params.ledger.genesis->account_field ().value ()); } diff --git a/nano/node/bounded_backlog.cpp b/nano/node/bounded_backlog.cpp new file mode 100644 index 0000000000..914009107d --- /dev/null +++ b/nano/node/bounded_backlog.cpp @@ -0,0 +1,530 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +nano::bounded_backlog::bounded_backlog (nano::bounded_backlog_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::bucketing & bucketing_a, nano::backlog_scan & backlog_scan_a, nano::block_processor & block_processor_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a) : + config{ config_a }, + node{ node_a }, + ledger{ ledger_a }, + bucketing{ bucketing_a }, + backlog_scan{ backlog_scan_a }, + block_processor{ block_processor_a }, + confirming_set{ confirming_set_a }, + stats{ stats_a }, + logger{ logger_a }, + scan_limiter{ config.batch_size }, + workers{ 1, nano::thread_role::name::bounded_backlog_notifications } +{ + // Activate accounts with unconfirmed blocks + backlog_scan.batch_activated.add ([this] (auto const & batch) { + auto transaction = ledger.tx_begin_read (); + for (auto const & info : batch) + { + activate (transaction, info.account, info.account_info, info.conf_info); + } + }); + + // Erase accounts with all confirmed blocks + backlog_scan.batch_scanned.add ([this] (auto const & batch) { + nano::lock_guard guard{ mutex }; + for (auto const & info : batch) + { + if (info.conf_info.height == info.account_info.block_count) + { + index.erase (info.account); + } + } + }); + + // Track unconfirmed blocks + block_processor.batch_processed.add ([this] (auto const & batch) { + auto transaction = ledger.tx_begin_read (); + for (auto const & [result, context] : batch) + { + if (result == nano::block_status::progress) + { + auto const & block = context.block; + insert (transaction, *block); + } + } + }); + + // Remove rolled back blocks from the backlog + block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { + nano::lock_guard guard{ mutex }; + for (auto const & block : blocks) + { + index.erase (block->hash ()); + } + }); + + // Remove cemented blocks from the backlog + confirming_set.batch_cemented.add ([this] (auto const & batch) { + nano::lock_guard guard{ mutex }; + for (auto const & context : batch) + { + index.erase (context.block->hash ()); + } + }); +} + +nano::bounded_backlog::~bounded_backlog () +{ + // Thread must be stopped before destruction + debug_assert (!thread.joinable ()); + debug_assert (!scan_thread.joinable ()); + debug_assert (!workers.alive ()); +} + +void nano::bounded_backlog::start () +{ + debug_assert (!thread.joinable ()); + + workers.start (); + + thread = std::thread{ [this] () { + nano::thread_role::set (nano::thread_role::name::bounded_backlog); + run (); + } }; + + scan_thread = std::thread{ [this] () { + nano::thread_role::set (nano::thread_role::name::bounded_backlog_scan); + run_scan (); + } }; +} + +void nano::bounded_backlog::stop () +{ + { + nano::lock_guard lock{ mutex }; + stopped = true; + } + condition.notify_all (); + if (thread.joinable ()) + { + thread.join (); + } + if (scan_thread.joinable ()) + { + scan_thread.join (); + } + workers.stop (); +} + +uint64_t nano::bounded_backlog::backlog_size () const +{ + return index.size (); +} + +bool nano::bounded_backlog::erase (nano::secure::transaction const & transaction, nano::account const & account) +{ + nano::lock_guard guard{ mutex }; + return index.erase (account); +} + +void nano::bounded_backlog::activate (nano::secure::transaction & transaction, nano::account const & account, nano::account_info const & account_info, nano::confirmation_height_info const & conf_info) +{ + debug_assert (conf_info.frontier != account_info.head); + + auto contains = [this] (nano::block_hash const & hash) { + nano::lock_guard guard{ mutex }; + return index.contains (hash); + }; + + // Insert blocks into the index starting from the account head block + auto block = ledger.any.block_get (transaction, account_info.head); + while (block) + { + // We reached the confirmed frontier, no need to track more blocks + if (block->hash () == conf_info.frontier) + { + break; + } + // Check if the block is already in the backlog, avoids unnecessary ledger lookups + if (contains (block->hash ())) + { + break; + } + + bool inserted = insert (transaction, *block); + + // If the block was not inserted, we already have it in the backlog + if (!inserted) + { + break; + } + + transaction.refresh_if_needed (); + + block = ledger.any.block_get (transaction, block->previous ()); + } +} + +void nano::bounded_backlog::update (nano::secure::transaction const & transaction, nano::block_hash const & hash) +{ + // Erase if the block is either confirmed or missing + if (!ledger.unconfirmed_exists (transaction, hash)) + { + nano::lock_guard guard{ mutex }; + index.erase (hash); + } +} + +bool nano::bounded_backlog::insert (nano::secure::transaction const & transaction, nano::block const & block) +{ + auto const [priority_balance, priority_timestamp] = ledger.block_priority (transaction, block); + auto const bucket_index = bucketing.bucket_index (priority_balance); + + nano::lock_guard guard{ mutex }; + + return index.insert (block, bucket_index, priority_timestamp); +} + +bool nano::bounded_backlog::predicate () const +{ + debug_assert (!mutex.try_lock ()); + // Both ledger and tracked backlog must be over the threshold + return ledger.backlog_count () > config.max_backlog && index.size () > config.max_backlog; +} + +void nano::bounded_backlog::run () +{ + std::unique_lock lock{ mutex }; + while (!stopped) + { + if (predicate ()) + { + // Wait until all notification about the previous rollbacks are processed + while (workers.queued_tasks () >= config.max_queued_notifications) + { + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::cooldown); + condition.wait_for (lock, 100ms, [this] { return stopped.load (); }); + if (stopped) + { + return; + } + } + + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::loop); + + // Calculate the number of targets to rollback + uint64_t const backlog = ledger.backlog_count (); + uint64_t const target_count = backlog > config.max_backlog ? backlog - config.max_backlog : 0; + + auto targets = gather_targets (std::min (target_count, static_cast (config.batch_size))); + if (!targets.empty ()) + { + lock.unlock (); + + stats.add (nano::stat::type::bounded_backlog, nano::stat::detail::gathered_targets, targets.size ()); + auto processed = perform_rollbacks (targets); + + lock.lock (); + + // Erase rolled back blocks from the index + for (auto const & hash : processed) + { + index.erase (hash); + } + } + else + { + // Cooldown, this should not happen in normal operation + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::no_targets); + condition.wait_for (lock, 100ms, [this] { + return stopped.load (); + }); + } + } + else + { + condition.wait_for (lock, 1s, [this] { + return stopped || predicate (); + }); + } + } +} + +bool nano::bounded_backlog::should_rollback (nano::block_hash const & hash) const +{ + if (node.vote_cache.exists (hash)) + { + return false; + } + if (node.vote_router.exists (hash)) + { + return false; + } + if (node.active.recently_confirmed.exists (hash)) + { + return false; + } + if (node.scheduler.exists (hash)) + { + return false; + } + if (node.confirming_set.contains (hash)) + { + return false; + } + if (node.local_block_broadcaster.contains (hash)) + { + return false; + } + return true; +} + +std::deque nano::bounded_backlog::perform_rollbacks (std::deque const & targets) +{ + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::performing_rollbacks); + + auto transaction = ledger.tx_begin_write (nano::store::writer::bounded_backlog); + + std::deque processed; + for (auto const & hash : targets) + { + // Skip the rollback if the block is being used by the node, this should be race free as it's checked while holding the ledger write lock + if (!should_rollback (hash)) + { + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::rollback_skipped); + continue; + } + + // Here we check that the block is still OK to rollback, there could be a delay between gathering the targets and performing the rollbacks + if (auto block = ledger.any.block_get (transaction, hash)) + { + logger.debug (nano::log::type::bounded_backlog, "Rolling back: {}, account: {}", hash.to_string (), block->account ().to_account ()); + + std::deque> rollback_list; + bool error = ledger.rollback (transaction, hash, rollback_list); + stats.inc (nano::stat::type::bounded_backlog, error ? nano::stat::detail::rollback_failed : nano::stat::detail::rollback); + + for (auto const & rollback : rollback_list) + { + processed.push_back (rollback->hash ()); + } + + // Notify observers of the rolled back blocks on a background thread, avoid dispatching notifications when holding ledger write transaction + workers.post ([this, rollback_list = std::move (rollback_list), root = block->qualified_root ()] { + // TODO: Calling block_processor's event here is not ideal, but duplicating these events is even worse + block_processor.rolled_back.notify (rollback_list, root); + }); + } + else + { + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::rollback_missing_block); + processed.push_back (hash); + } + } + + return processed; +} + +std::deque nano::bounded_backlog::gather_targets (size_t max_count) const +{ + debug_assert (!mutex.try_lock ()); + + std::deque targets; + + // Start rolling back from lowest index buckets first + for (auto bucket : bucketing.bucket_indices ()) + { + // Only start rolling back if the bucket is over the threshold of unconfirmed blocks + if (index.size (bucket) > config.bucket_threshold) + { + auto const count = std::min (max_count, config.batch_size); + + auto const top = index.top (bucket, count, [this] (auto const & hash) { + // Only rollback if the block is not being used by the node + return should_rollback (hash); + }); + + for (auto const & entry : top) + { + targets.push_back (entry); + } + } + } + + return targets; +} + +void nano::bounded_backlog::run_scan () +{ + std::unique_lock lock{ mutex }; + while (!stopped) + { + auto wait = [&] (auto count) { + while (!scan_limiter.should_pass (count)) + { + condition.wait_for (lock, 100ms); + if (stopped) + { + return; + } + } + }; + + nano::block_hash last = 0; + while (!stopped) + { + wait (config.batch_size); + + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::loop_scan); + + auto batch = index.next (last, config.batch_size); + if (batch.empty ()) // If batch is empty, we iterated over all accounts in the index + { + break; + } + + lock.unlock (); + { + auto transaction = ledger.tx_begin_read (); + for (auto const & hash : batch) + { + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::scanned); + update (transaction, hash); + last = hash; + } + } + lock.lock (); + } + } +} + +nano::container_info nano::bounded_backlog::container_info () const +{ + nano::lock_guard guard{ mutex }; + nano::container_info info; + info.put ("backlog", index.size ()); + info.put ("notifications", workers.queued_tasks ()); + info.add ("index", index.container_info ()); + return info; +} + +/* + * backlog_index + */ + +bool nano::backlog_index::insert (nano::block const & block, nano::bucket_index bucket, nano::priority_timestamp priority) +{ + auto const hash = block.hash (); + auto const account = block.account (); + + entry new_entry{ + .hash = hash, + .account = account, + .bucket = bucket, + .priority = priority, + }; + + auto [it, inserted] = blocks.emplace (new_entry); + if (inserted) + { + size_by_bucket[bucket]++; + return true; + } + return false; +} + +bool nano::backlog_index::erase (nano::account const & account) +{ + auto const [begin, end] = blocks.get ().equal_range (account); + for (auto it = begin; it != end;) + { + size_by_bucket[it->bucket]--; + it = blocks.get ().erase (it); + } + return begin != end; +} + +bool nano::backlog_index::erase (nano::block_hash const & hash) +{ + if (auto existing = blocks.get ().find (hash); existing != blocks.get ().end ()) + { + size_by_bucket[existing->bucket]--; + blocks.get ().erase (existing); + return true; + } + return false; +} + +std::deque nano::backlog_index::top (nano::bucket_index bucket, size_t count, filter_callback const & filter) const +{ + // Highest timestamp, lowest priority, iterate in descending order + priority_key const starting_key{ bucket, std::numeric_limits::max () }; + + std::deque results; + auto begin = blocks.get ().lower_bound (starting_key); + for (auto it = begin; it != blocks.get ().end () && it->bucket == bucket && results.size () < count; ++it) + { + if (filter (it->hash)) + { + results.push_back (it->hash); + } + } + return results; +} + +std::deque nano::backlog_index::next (nano::block_hash last, size_t count) const +{ + std::deque results; + + auto it = blocks.get ().upper_bound (last); + auto end = blocks.get ().end (); + + while (it != end && results.size () < count) + { + results.push_back (it->hash); + last = it->hash; + it = blocks.get ().upper_bound (last); + } + return results; +} + +bool nano::backlog_index::contains (nano::block_hash const & hash) const +{ + return blocks.get ().contains (hash); +} + +size_t nano::backlog_index::size () const +{ + return blocks.size (); +} + +size_t nano::backlog_index::size (nano::bucket_index bucket) const +{ + if (auto it = size_by_bucket.find (bucket); it != size_by_bucket.end ()) + { + return it->second; + } + return 0; +} + +nano::container_info nano::backlog_index::container_info () const +{ + auto collect_bucket_sizes = [&] () { + nano::container_info info; + for (auto [bucket, count] : size_by_bucket) + { + info.put (std::to_string (bucket), count); + } + return info; + }; + + nano::container_info info; + info.put ("blocks", blocks); + info.add ("sizes", collect_bucket_sizes ()); + return info; +} \ No newline at end of file diff --git a/nano/node/bounded_backlog.hpp b/nano/node/bounded_backlog.hpp new file mode 100644 index 0000000000..899caacab5 --- /dev/null +++ b/nano/node/bounded_backlog.hpp @@ -0,0 +1,155 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace mi = boost::multi_index; + +namespace nano +{ +class backlog_index +{ +public: + struct priority_key + { + nano::bucket_index bucket; + nano::priority_timestamp priority; + + auto operator<=> (priority_key const &) const = default; + }; + + struct entry + { + nano::block_hash hash; + nano::account account; + nano::bucket_index bucket; + nano::priority_timestamp priority; + + backlog_index::priority_key priority_key () const + { + return { bucket, priority }; + } + }; + +public: + backlog_index () = default; + + bool insert (nano::block const & block, nano::bucket_index, nano::priority_timestamp); + + bool erase (nano::account const & account); + bool erase (nano::block_hash const & hash); + + using filter_callback = std::function; + std::deque top (nano::bucket_index, size_t count, filter_callback const &) const; + + std::deque next (nano::block_hash last, size_t count) const; + + bool contains (nano::block_hash const & hash) const; + size_t size () const; + size_t size (nano::bucket_index) const; + + nano::container_info container_info () const; + +private: + // clang-format off + class tag_hash {}; + class tag_hash_ordered {}; + class tag_account {}; + class tag_priority {}; + + using ordered_blocks = boost::multi_index_container, // Allows for fast lookup + mi::member>, + mi::ordered_unique, // Allows for sequential scan + mi::member>, + mi::hashed_non_unique, + mi::member>, + mi::ordered_non_unique, + mi::const_mem_fun, std::greater<>> // DESC order + >>; + // clang-format on + + ordered_blocks blocks; + + // Keep track of the size of the backlog in number of unconfirmed blocks per bucket + std::map size_by_bucket; +}; + +class bounded_backlog_config +{ +public: + size_t max_backlog{ 100000 }; + size_t bucket_threshold{ 1000 }; + double overfill_factor{ 1.5 }; + size_t batch_size{ 32 }; + size_t max_queued_notifications{ 128 }; +}; + +class bounded_backlog +{ +public: + bounded_backlog (bounded_backlog_config const &, nano::node &, nano::ledger &, nano::bucketing &, nano::backlog_scan &, nano::block_processor &, nano::confirming_set &, nano::stats &, nano::logger &); + ~bounded_backlog (); + + void start (); + void stop (); + + uint64_t backlog_size () const; + + nano::container_info container_info () const; + +private: // Dependencies + bounded_backlog_config const & config; + nano::node & node; + nano::ledger & ledger; + nano::bucketing & bucketing; + nano::backlog_scan & backlog_scan; + nano::block_processor & block_processor; + nano::confirming_set & confirming_set; + nano::stats & stats; + nano::logger & logger; + +private: + void activate (nano::secure::transaction &, nano::account const &, nano::account_info const &, nano::confirmation_height_info const &); + void update (nano::secure::transaction const &, nano::block_hash const &); + bool erase (nano::secure::transaction const &, nano::account const &); + bool insert (nano::secure::transaction const &, nano::block const &); + + bool predicate () const; + void run (); + std::deque gather_targets (size_t max_count) const; + bool should_rollback (nano::block_hash const &) const; + + std::deque perform_rollbacks (std::deque const & targets); + + void run_scan (); + +private: + nano::backlog_index index; + + nano::rate_limiter scan_limiter; + + std::atomic stopped{ false }; + nano::condition_variable condition; + mutable nano::mutex mutex; + std::thread thread; + std::thread scan_thread; + + nano::thread_pool workers; +}; +} \ No newline at end of file diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 414c3e6c3c..e477f8414a 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -9,7 +9,9 @@ namespace nano { class account_sets_config; class active_elections; +class backlog_scan; class block_processor; +class bounded_backlog; class bucketing; class bootstrap_config; class bootstrap_server; diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index b2a41a747f..31715309a7 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -103,6 +103,12 @@ void nano::local_block_broadcaster::stop () nano::join_or_pass (thread); } +bool nano::local_block_broadcaster::contains (nano::block_hash const & hash) const +{ + nano::lock_guard lock{ mutex }; + return local_blocks.get ().contains (hash); +} + size_t nano::local_block_broadcaster::size () const { nano::lock_guard lock{ mutex }; diff --git a/nano/node/local_block_broadcaster.hpp b/nano/node/local_block_broadcaster.hpp index a88ed25f95..567244491f 100644 --- a/nano/node/local_block_broadcaster.hpp +++ b/nano/node/local_block_broadcaster.hpp @@ -59,6 +59,7 @@ class local_block_broadcaster final void start (); void stop (); + bool contains (nano::block_hash const &) const; size_t size () const; nano::container_info container_info () const; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 48c384b194..670f529b25 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -159,6 +160,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy wallets (wallets_store.init_error (), *this), backlog_scan_impl{ std::make_unique (config.backlog_scan, ledger, stats) }, backlog_scan{ *backlog_scan_impl }, + backlog_impl{ std::make_unique (config.backlog, *this, ledger, bucketing, backlog_scan, block_processor, confirming_set, stats, logger) }, + backlog{ *backlog_impl }, bootstrap_server_impl{ std::make_unique (config.bootstrap_server, store, ledger, network_params.network, stats) }, bootstrap_server{ *bootstrap_server_impl }, bootstrap_impl{ std::make_unique (config, block_processor, ledger, network, stats, logger) }, @@ -651,6 +654,7 @@ void nano::node::start () scheduler.start (); aggregator.start (); backlog_scan.start (); + backlog.start (); bootstrap_server.start (); bootstrap.start (); websocket.start (); @@ -683,6 +687,7 @@ void nano::node::stop () distributed_work.stop (); backlog_scan.stop (); bootstrap.stop (); + backlog.stop (); rep_crawler.stop (); unchecked.stop (); block_processor.stop (); @@ -1211,6 +1216,7 @@ nano::container_info nano::node::container_info () const info.add ("message_processor", message_processor.container_info ()); info.add ("bandwidth", outbound_limiter.container_info ()); info.add ("backlog_scan", backlog_scan.container_info ()); + info.add ("bounded_backlog", backlog.container_info ()); return info; } diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 3d85f7cffa..691cec138f 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -33,7 +33,6 @@ namespace nano { class active_elections; -class backlog_scan; class bandwidth_limiter; class confirming_set; class message_processor; @@ -209,6 +208,8 @@ class node final : public std::enable_shared_from_this nano::wallets wallets; std::unique_ptr backlog_scan_impl; nano::backlog_scan & backlog_scan; + std::unique_ptr backlog_impl; + nano::bounded_backlog & backlog; std::unique_ptr bootstrap_server_impl; nano::bootstrap_server & bootstrap_server; std::unique_ptr bootstrap_impl; diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index ed4ca6fa3e..5fb3e0d8d7 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -147,6 +148,7 @@ class node_config nano::confirming_set_config confirming_set; nano::monitor_config monitor; nano::backlog_scan_config backlog_scan; + nano::bounded_backlog_config backlog; public: /** Entry is ignored if it cannot be parsed as a valid address:port */ diff --git a/nano/secure/ledger.cpp b/nano/secure/ledger.cpp index 9dab65de67..a78ca81280 100644 --- a/nano/secure/ledger.cpp +++ b/nano/secure/ledger.cpp @@ -792,6 +792,11 @@ void nano::ledger::initialize (nano::generate_cache_flags const & generate_cache cache.pruned_count = store.pruned.count (transaction); } +bool nano::ledger::unconfirmed_exists (secure::transaction const & transaction, nano::block_hash const & hash) +{ + return any.block_exists (transaction, hash) && !confirmed.block_exists (transaction, hash); +} + nano::uint128_t nano::ledger::account_receivable (secure::transaction const & transaction_a, nano::account const & account_a, bool only_confirmed_a) { nano::uint128_t result (0); @@ -1524,6 +1529,13 @@ uint64_t nano::ledger::pruned_count () const return cache.pruned_count; } +uint64_t nano::ledger::backlog_count () const +{ + auto blocks = cache.block_count.load (); + auto cemented = cache.cemented_count.load (); + return (blocks > cemented) ? blocks - cemented : 0; +} + auto nano::ledger::block_priority (nano::secure::transaction const & transaction, nano::block const & block) const -> block_priority_result { auto const balance = block.balance (); diff --git a/nano/secure/ledger.hpp b/nano/secure/ledger.hpp index e1ad3b9fc2..52c3b5201a 100644 --- a/nano/secure/ledger.hpp +++ b/nano/secure/ledger.hpp @@ -43,6 +43,7 @@ class ledger final /** Start read-only transaction */ secure::read_transaction tx_begin_read () const; + bool unconfirmed_exists (secure::transaction const &, nano::block_hash const &); nano::uint128_t account_receivable (secure::transaction const &, nano::account const &, bool = false); /** * Returns the cached vote weight for the given representative. @@ -81,6 +82,7 @@ class ledger final uint64_t block_count () const; uint64_t account_count () const; uint64_t pruned_count () const; + uint64_t backlog_count () const; using block_priority_result = std::pair; block_priority_result block_priority (secure::transaction const &, nano::block const &) const; diff --git a/nano/store/write_queue.hpp b/nano/store/write_queue.hpp index 0171685b87..0c395212c4 100644 --- a/nano/store/write_queue.hpp +++ b/nano/store/write_queue.hpp @@ -17,6 +17,7 @@ enum class writer confirmation_height, pruning, voting_final, + bounded_backlog, testing // Used in tests to emulate a write lock };