From bdf129b96b477c6e96061a05fddbefed12813c3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 26 Oct 2024 00:05:47 +0200 Subject: [PATCH 01/11] Virtual transaction refresh_if_needed --- nano/secure/transaction.hpp | 9 ++++++--- nano/store/transaction.cpp | 4 +++- nano/store/transaction.hpp | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/nano/secure/transaction.hpp b/nano/secure/transaction.hpp index 8a3ebd9c1d..0a1bd90a41 100644 --- a/nano/secure/transaction.hpp +++ b/nano/secure/transaction.hpp @@ -27,6 +27,9 @@ class transaction // Conversion operator to const nano::store::transaction& virtual operator const nano::store::transaction & () const = 0; + + // Certain transactions may need to be refreshed if they are held for a long time + virtual bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) = 0; }; class write_transaction final : public transaction @@ -69,7 +72,7 @@ class write_transaction final : public transaction renew (); } - bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) + bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) override { auto now = std::chrono::steady_clock::now (); if (now - start > max_age) @@ -119,9 +122,9 @@ class read_transaction final : public transaction txn.refresh (); } - void refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) + bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) override { - txn.refresh_if_needed (max_age); + return txn.refresh_if_needed (max_age); } auto timestamp () const diff --git a/nano/store/transaction.cpp b/nano/store/transaction.cpp index 0d1d6a9008..6f475bf06c 100644 --- a/nano/store/transaction.cpp +++ b/nano/store/transaction.cpp @@ -83,13 +83,15 @@ void nano::store::read_transaction::refresh () renew (); } -void nano::store::read_transaction::refresh_if_needed (std::chrono::milliseconds max_age) +bool nano::store::read_transaction::refresh_if_needed (std::chrono::milliseconds max_age) { auto now = std::chrono::steady_clock::now (); if (now - start > max_age) { refresh (); + return true; } + return false; } /* diff --git a/nano/store/transaction.hpp b/nano/store/transaction.hpp index 23459a9258..0697e1b1e8 100644 --- a/nano/store/transaction.hpp +++ b/nano/store/transaction.hpp @@ -66,7 +66,7 @@ class read_transaction final : public transaction void reset (); void renew (); void refresh (); - void refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }); + bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }); private: std::unique_ptr impl; From 6a4068ce9874127296fbe1e3b7497ffe41c1b760 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 26 Oct 2024 00:15:27 +0200 Subject: [PATCH 02/11] Short circuit hash zero lookup --- nano/secure/ledger_set_any.cpp | 12 ++++++++++++ nano/secure/ledger_set_confirmed.cpp | 12 ++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/nano/secure/ledger_set_any.cpp b/nano/secure/ledger_set_any.cpp index 67b14fda19..4eed6292b0 100644 --- a/nano/secure/ledger_set_any.cpp +++ b/nano/secure/ledger_set_any.cpp @@ -123,11 +123,19 @@ std::optional nano::ledger_set_any::block_balance (secure::transac bool nano::ledger_set_any::block_exists (secure::transaction const & transaction, nano::block_hash const & hash) const { + if (hash.is_zero ()) + { + return false; + } return ledger.store.block.exists (transaction, hash); } bool nano::ledger_set_any::block_exists_or_pruned (secure::transaction const & transaction, nano::block_hash const & hash) const { + if (hash.is_zero ()) + { + return false; + } if (ledger.store.pruned.exists (transaction, hash)) { return true; @@ -137,6 +145,10 @@ bool nano::ledger_set_any::block_exists_or_pruned (secure::transaction const & t std::shared_ptr nano::ledger_set_any::block_get (secure::transaction const & transaction, nano::block_hash const & hash) const { + if (hash.is_zero ()) + { + return nullptr; + } return ledger.store.block.get (transaction, hash); } diff --git a/nano/secure/ledger_set_confirmed.cpp b/nano/secure/ledger_set_confirmed.cpp index 8702d61b4f..655f04edf8 100644 --- a/nano/secure/ledger_set_confirmed.cpp +++ b/nano/secure/ledger_set_confirmed.cpp @@ -45,10 +45,6 @@ uint64_t nano::ledger_set_confirmed::account_height (secure::transaction const & std::optional nano::ledger_set_confirmed::block_balance (secure::transaction const & transaction, nano::block_hash const & hash) const { - if (hash.is_zero ()) - { - return std::nullopt; - } auto block = block_get (transaction, hash); if (!block) { @@ -64,6 +60,10 @@ bool nano::ledger_set_confirmed::block_exists (secure::transaction const & trans bool nano::ledger_set_confirmed::block_exists_or_pruned (secure::transaction const & transaction, nano::block_hash const & hash) const { + if (hash.is_zero ()) + { + return false; + } if (ledger.store.pruned.exists (transaction, hash)) { return true; @@ -73,6 +73,10 @@ bool nano::ledger_set_confirmed::block_exists_or_pruned (secure::transaction con std::shared_ptr nano::ledger_set_confirmed::block_get (secure::transaction const & transaction, nano::block_hash const & hash) const { + if (hash.is_zero ()) + { + return nullptr; + } auto block = ledger.store.block.get (transaction, hash); if (!block) { From 52c1d23ec69849d671193d7c6769e3b6c5d882b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 27 Oct 2024 00:19:30 +0200 Subject: [PATCH 03/11] Reprocess election winner blocks --- nano/lib/stats_enums.hpp | 1 + nano/node/blockprocessor.cpp | 2 +- nano/node/blockprocessor.hpp | 2 ++ nano/node/election.cpp | 4 +++- 4 files changed, 7 insertions(+), 2 deletions(-) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 8281754014..200143ea20 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -203,6 +203,7 @@ enum class detail unchecked, local, forced, + election, // message specific not_a_type, diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index de314b20e9..544de7973c 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -51,7 +51,7 @@ nano::block_processor::block_processor (nano::node_config const & node_config, n case nano::block_source::local: return config.priority_local; default: - return 1; + return config.priority_system; } }; diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 50ae78c2ef..1adec9ca62 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -24,6 +24,7 @@ enum class block_source unchecked, local, forced, + election, }; std::string_view to_string (block_source); @@ -47,6 +48,7 @@ class block_processor_config final size_t priority_live{ 1 }; size_t priority_bootstrap{ 8 }; size_t priority_local{ 16 }; + size_t priority_system{ 32 }; size_t batch_size{ 256 }; size_t max_queued_notifications{ 8 }; diff --git a/nano/node/election.cpp b/nano/node/election.cpp index 1e760bb938..ee64237059 100644 --- a/nano/node/election.cpp +++ b/nano/node/election.cpp @@ -399,7 +399,7 @@ void nano::election::confirm_if_quorum (nano::unique_lock & lock_a) { debug_assert (lock_a.owns_lock ()); auto tally_l (tally_impl ()); - debug_assert (!tally_l.empty ()); + release_assert (!tally_l.empty ()); auto winner (tally_l.begin ()); auto block_l (winner->second); auto const & winner_hash_l (block_l->hash ()); @@ -425,6 +425,8 @@ void nano::election::confirm_if_quorum (nano::unique_lock & lock_a) } if (final_weight >= node.online_reps.delta ()) { + // In some edge cases block might get rolled back while the election is confirming, reprocess it to ensure it's present in the ledger + node.block_processor.add (block_l, nano::block_source::election); confirm_once (lock_a); debug_assert (!lock_a.owns_lock ()); } From cefc1ee1237cba337352d785060ac90d9b681ccd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 31 Oct 2024 12:30:26 +0100 Subject: [PATCH 04/11] Bounded backlog --- nano/core_test/node.cpp | 21 + nano/lib/logging_enums.hpp | 1 + nano/lib/stats_enums.hpp | 9 + nano/lib/thread_roles.cpp | 9 + nano/lib/thread_roles.hpp | 3 + nano/node/CMakeLists.txt | 2 + nano/node/bootstrap/bootstrap_service.cpp | 10 + nano/node/bounded_backlog.cpp | 531 ++++++++++++++++++++++ nano/node/bounded_backlog.hpp | 155 +++++++ nano/node/fwd.hpp | 2 + nano/node/local_block_broadcaster.cpp | 6 + nano/node/local_block_broadcaster.hpp | 1 + nano/node/node.cpp | 6 + nano/node/node.hpp | 3 +- nano/node/nodeconfig.hpp | 2 + nano/secure/ledger.cpp | 12 + nano/secure/ledger.hpp | 2 + nano/store/write_queue.hpp | 1 + 18 files changed, 775 insertions(+), 1 deletion(-) create mode 100644 nano/node/bounded_backlog.cpp create mode 100644 nano/node/bounded_backlog.hpp diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 1d9118ec2c..593c5ee6c1 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -3698,3 +3699,23 @@ TEST (node, container_info) ASSERT_NO_THROW (node1.container_info ()); ASSERT_NO_THROW (node2.container_info ()); } + +TEST (node, bounded_backlog) +{ + nano::test::system system; + + nano::node_config node_config; + node_config.backlog.max_backlog = 10; + node_config.backlog.bucket_threshold = 2; + node_config.backlog_scan.enable = false; + auto & node = *system.add_node (node_config); + + const int howmany_blocks = 64; + const int howmany_chains = 16; + + auto chains = nano::test::setup_chains (system, node, howmany_chains, howmany_blocks, nano::dev::genesis_key, /* do not confirm */ false); + + node.backlog_scan.trigger (); + + ASSERT_TIMELY_EQ (20s, node.ledger.block_count (), 11); // 10 + genesis +} diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index 85136b98b7..b3f5312577 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -83,6 +83,7 @@ enum class type local_block_broadcaster, monitor, confirming_set, + bounded_backlog, // bootstrap bulk_pull_client, diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 200143ea20..2022b29083 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -85,6 +85,7 @@ enum class type active_elections_cancelled, active_elections_cemented, backlog_scan, + bounded_backlog, backlog, unchecked, election_scheduler, @@ -570,6 +571,14 @@ enum class detail blocks_by_account, account_info_by_hash, + // bounded backlog, + gathered_targets, + performing_rollbacks, + no_targets, + rollback_missing_block, + rollback_skipped, + loop_scan, + _last // Must be the last enum }; diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 10b6c0d333..503b0281f8 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -100,6 +100,15 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::backlog_scan: thread_role_name_string = "Backlog scan"; break; + case nano::thread_role::name::bounded_backlog: + thread_role_name_string = "Bounded backlog"; + break; + case nano::thread_role::name::bounded_backlog_scan: + thread_role_name_string = "Bounded b scan"; + break; + case nano::thread_role::name::bounded_backlog_notifications: + thread_role_name_string = "Bounded b notif"; + break; case nano::thread_role::name::vote_generator_queue: thread_role_name_string = "Voting que"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index c98248ceae..420aa0d8ea 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -38,6 +38,9 @@ enum class name db_parallel_traversal, unchecked, backlog_scan, + bounded_backlog, + bounded_backlog_scan, + bounded_backlog_notifications, vote_generator_queue, telemetry, bootstrap, diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 7fa1263972..13a182adff 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -24,6 +24,8 @@ add_library( blockprocessor.cpp bucketing.hpp bucketing.cpp + bounded_backlog.hpp + bounded_backlog.cpp bootstrap_weights_beta.hpp bootstrap_weights_live.hpp bootstrap/account_sets.hpp diff --git a/nano/node/bootstrap/bootstrap_service.cpp b/nano/node/bootstrap/bootstrap_service.cpp index 2a71ffb08e..e6a2295cb8 100644 --- a/nano/node/bootstrap/bootstrap_service.cpp +++ b/nano/node/bootstrap/bootstrap_service.cpp @@ -50,6 +50,16 @@ nano::bootstrap_service::bootstrap_service (nano::node_config const & node_confi condition.notify_all (); }); + // Unblock rolled back accounts as the dependency is no longer valid + block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { + nano::lock_guard lock{ mutex }; + for (auto const & block : blocks) + { + debug_assert (block != nullptr); + accounts.unblock (block->account ()); + } + }); + accounts.priority_set (node_config_a.network_params.ledger.genesis->account_field ().value ()); } diff --git a/nano/node/bounded_backlog.cpp b/nano/node/bounded_backlog.cpp new file mode 100644 index 0000000000..094cf219a9 --- /dev/null +++ b/nano/node/bounded_backlog.cpp @@ -0,0 +1,531 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +nano::bounded_backlog::bounded_backlog (nano::bounded_backlog_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::bucketing & bucketing_a, nano::backlog_scan & backlog_scan_a, nano::block_processor & block_processor_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a) : + config{ config_a }, + node{ node_a }, + ledger{ ledger_a }, + bucketing{ bucketing_a }, + backlog_scan{ backlog_scan_a }, + block_processor{ block_processor_a }, + confirming_set{ confirming_set_a }, + stats{ stats_a }, + logger{ logger_a }, + scan_limiter{ config.batch_size }, + workers{ 1, nano::thread_role::name::bounded_backlog_notifications } +{ + // Activate accounts with unconfirmed blocks + backlog_scan.batch_activated.add ([this] (auto const & batch) { + auto transaction = ledger.tx_begin_read (); + for (auto const & info : batch) + { + activate (transaction, info.account, info.account_info, info.conf_info); + } + }); + + // Erase accounts with all confirmed blocks + backlog_scan.batch_scanned.add ([this] (auto const & batch) { + nano::lock_guard guard{ mutex }; + for (auto const & info : batch) + { + if (info.conf_info.height == info.account_info.block_count) + { + index.erase (info.account); + } + } + }); + + // Track unconfirmed blocks + block_processor.batch_processed.add ([this] (auto const & batch) { + auto transaction = ledger.tx_begin_read (); + for (auto const & [result, context] : batch) + { + if (result == nano::block_status::progress) + { + auto const & block = context.block; + insert (transaction, *block); + } + } + }); + + // Remove rolled back blocks from the backlog + block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { + nano::lock_guard guard{ mutex }; + for (auto const & block : blocks) + { + index.erase (block->hash ()); + } + }); + + // Remove cemented blocks from the backlog + confirming_set.batch_cemented.add ([this] (auto const & batch) { + nano::lock_guard guard{ mutex }; + for (auto const & context : batch) + { + index.erase (context.block->hash ()); + } + }); +} + +nano::bounded_backlog::~bounded_backlog () +{ + // Thread must be stopped before destruction + debug_assert (!thread.joinable ()); + debug_assert (!scan_thread.joinable ()); + debug_assert (!workers.alive ()); +} + +void nano::bounded_backlog::start () +{ + debug_assert (!thread.joinable ()); + + workers.start (); + + thread = std::thread{ [this] () { + nano::thread_role::set (nano::thread_role::name::bounded_backlog); + run (); + } }; + + scan_thread = std::thread{ [this] () { + nano::thread_role::set (nano::thread_role::name::bounded_backlog_scan); + run_scan (); + } }; +} + +void nano::bounded_backlog::stop () +{ + { + nano::lock_guard lock{ mutex }; + stopped = true; + } + condition.notify_all (); + if (thread.joinable ()) + { + thread.join (); + } + if (scan_thread.joinable ()) + { + scan_thread.join (); + } + workers.stop (); +} + +size_t nano::bounded_backlog::index_size () const +{ + nano::lock_guard guard{ mutex }; + return index.size (); +} + +bool nano::bounded_backlog::erase (nano::secure::transaction const & transaction, nano::account const & account) +{ + nano::lock_guard guard{ mutex }; + return index.erase (account); +} + +void nano::bounded_backlog::activate (nano::secure::transaction & transaction, nano::account const & account, nano::account_info const & account_info, nano::confirmation_height_info const & conf_info) +{ + debug_assert (conf_info.frontier != account_info.head); + + auto contains = [this] (nano::block_hash const & hash) { + nano::lock_guard guard{ mutex }; + return index.contains (hash); + }; + + // Insert blocks into the index starting from the account head block + auto block = ledger.any.block_get (transaction, account_info.head); + while (block) + { + // We reached the confirmed frontier, no need to track more blocks + if (block->hash () == conf_info.frontier) + { + break; + } + // Check if the block is already in the backlog, avoids unnecessary ledger lookups + if (contains (block->hash ())) + { + break; + } + + bool inserted = insert (transaction, *block); + + // If the block was not inserted, we already have it in the backlog + if (!inserted) + { + break; + } + + transaction.refresh_if_needed (); + + block = ledger.any.block_get (transaction, block->previous ()); + } +} + +void nano::bounded_backlog::update (nano::secure::transaction const & transaction, nano::block_hash const & hash) +{ + // Erase if the block is either confirmed or missing + if (!ledger.unconfirmed_exists (transaction, hash)) + { + nano::lock_guard guard{ mutex }; + index.erase (hash); + } +} + +bool nano::bounded_backlog::insert (nano::secure::transaction const & transaction, nano::block const & block) +{ + auto const [priority_balance, priority_timestamp] = ledger.block_priority (transaction, block); + auto const bucket_index = bucketing.bucket_index (priority_balance); + + nano::lock_guard guard{ mutex }; + + return index.insert (block, bucket_index, priority_timestamp); +} + +bool nano::bounded_backlog::predicate () const +{ + debug_assert (!mutex.try_lock ()); + // Both ledger and tracked backlog must be over the threshold + return ledger.backlog_count () > config.max_backlog && index.size () > config.max_backlog; +} + +void nano::bounded_backlog::run () +{ + std::unique_lock lock{ mutex }; + while (!stopped) + { + if (predicate ()) + { + // Wait until all notification about the previous rollbacks are processed + while (workers.queued_tasks () >= config.max_queued_notifications) + { + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::cooldown); + condition.wait_for (lock, 100ms, [this] { return stopped.load (); }); + if (stopped) + { + return; + } + } + + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::loop); + + // Calculate the number of targets to rollback + uint64_t const backlog = ledger.backlog_count (); + uint64_t const target_count = backlog > config.max_backlog ? backlog - config.max_backlog : 0; + + auto targets = gather_targets (std::min (target_count, static_cast (config.batch_size))); + if (!targets.empty ()) + { + lock.unlock (); + + stats.add (nano::stat::type::bounded_backlog, nano::stat::detail::gathered_targets, targets.size ()); + auto processed = perform_rollbacks (targets); + + lock.lock (); + + // Erase rolled back blocks from the index + for (auto const & hash : processed) + { + index.erase (hash); + } + } + else + { + // Cooldown, this should not happen in normal operation + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::no_targets); + condition.wait_for (lock, 100ms, [this] { + return stopped.load (); + }); + } + } + else + { + condition.wait_for (lock, 1s, [this] { + return stopped || predicate (); + }); + } + } +} + +bool nano::bounded_backlog::should_rollback (nano::block_hash const & hash) const +{ + if (node.vote_cache.contains (hash)) + { + return false; + } + if (node.vote_router.contains (hash)) + { + return false; + } + if (node.active.recently_confirmed.exists (hash)) + { + return false; + } + if (node.scheduler.contains (hash)) + { + return false; + } + if (node.confirming_set.contains (hash)) + { + return false; + } + if (node.local_block_broadcaster.contains (hash)) + { + return false; + } + return true; +} + +std::deque nano::bounded_backlog::perform_rollbacks (std::deque const & targets) +{ + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::performing_rollbacks); + + auto transaction = ledger.tx_begin_write (nano::store::writer::bounded_backlog); + + std::deque processed; + for (auto const & hash : targets) + { + // Skip the rollback if the block is being used by the node, this should be race free as it's checked while holding the ledger write lock + if (!should_rollback (hash)) + { + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::rollback_skipped); + continue; + } + + // Here we check that the block is still OK to rollback, there could be a delay between gathering the targets and performing the rollbacks + if (auto block = ledger.any.block_get (transaction, hash)) + { + logger.debug (nano::log::type::bounded_backlog, "Rolling back: {}, account: {}", hash.to_string (), block->account ().to_account ()); + + std::deque> rollback_list; + bool error = ledger.rollback (transaction, hash, rollback_list); + stats.inc (nano::stat::type::bounded_backlog, error ? nano::stat::detail::rollback_failed : nano::stat::detail::rollback); + + for (auto const & rollback : rollback_list) + { + processed.push_back (rollback->hash ()); + } + + // Notify observers of the rolled back blocks on a background thread, avoid dispatching notifications when holding ledger write transaction + workers.post ([this, rollback_list = std::move (rollback_list), root = block->qualified_root ()] { + // TODO: Calling block_processor's event here is not ideal, but duplicating these events is even worse + block_processor.rolled_back.notify (rollback_list, root); + }); + } + else + { + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::rollback_missing_block); + processed.push_back (hash); + } + } + + return processed; +} + +std::deque nano::bounded_backlog::gather_targets (size_t max_count) const +{ + debug_assert (!mutex.try_lock ()); + + std::deque targets; + + // Start rolling back from lowest index buckets first + for (auto bucket : bucketing.bucket_indices ()) + { + // Only start rolling back if the bucket is over the threshold of unconfirmed blocks + if (index.size (bucket) > config.bucket_threshold) + { + auto const count = std::min (max_count, config.batch_size); + + auto const top = index.top (bucket, count, [this] (auto const & hash) { + // Only rollback if the block is not being used by the node + return should_rollback (hash); + }); + + for (auto const & entry : top) + { + targets.push_back (entry); + } + } + } + + return targets; +} + +void nano::bounded_backlog::run_scan () +{ + std::unique_lock lock{ mutex }; + while (!stopped) + { + auto wait = [&] (auto count) { + while (!scan_limiter.should_pass (count)) + { + condition.wait_for (lock, 100ms); + if (stopped) + { + return; + } + } + }; + + nano::block_hash last = 0; + while (!stopped) + { + wait (config.batch_size); + + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::loop_scan); + + auto batch = index.next (last, config.batch_size); + if (batch.empty ()) // If batch is empty, we iterated over all accounts in the index + { + break; + } + + lock.unlock (); + { + auto transaction = ledger.tx_begin_read (); + for (auto const & hash : batch) + { + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::scanned); + update (transaction, hash); + last = hash; + } + } + lock.lock (); + } + } +} + +nano::container_info nano::bounded_backlog::container_info () const +{ + nano::lock_guard guard{ mutex }; + nano::container_info info; + info.put ("backlog", index.size ()); + info.put ("notifications", workers.queued_tasks ()); + info.add ("index", index.container_info ()); + return info; +} + +/* + * backlog_index + */ + +bool nano::backlog_index::insert (nano::block const & block, nano::bucket_index bucket, nano::priority_timestamp priority) +{ + auto const hash = block.hash (); + auto const account = block.account (); + + entry new_entry{ + .hash = hash, + .account = account, + .bucket = bucket, + .priority = priority, + }; + + auto [it, inserted] = blocks.emplace (new_entry); + if (inserted) + { + size_by_bucket[bucket]++; + return true; + } + return false; +} + +bool nano::backlog_index::erase (nano::account const & account) +{ + auto const [begin, end] = blocks.get ().equal_range (account); + for (auto it = begin; it != end;) + { + size_by_bucket[it->bucket]--; + it = blocks.get ().erase (it); + } + return begin != end; +} + +bool nano::backlog_index::erase (nano::block_hash const & hash) +{ + if (auto existing = blocks.get ().find (hash); existing != blocks.get ().end ()) + { + size_by_bucket[existing->bucket]--; + blocks.get ().erase (existing); + return true; + } + return false; +} + +std::deque nano::backlog_index::top (nano::bucket_index bucket, size_t count, filter_callback const & filter) const +{ + // Highest timestamp, lowest priority, iterate in descending order + priority_key const starting_key{ bucket, std::numeric_limits::max () }; + + std::deque results; + auto begin = blocks.get ().lower_bound (starting_key); + for (auto it = begin; it != blocks.get ().end () && it->bucket == bucket && results.size () < count; ++it) + { + if (filter (it->hash)) + { + results.push_back (it->hash); + } + } + return results; +} + +std::deque nano::backlog_index::next (nano::block_hash last, size_t count) const +{ + std::deque results; + + auto it = blocks.get ().upper_bound (last); + auto end = blocks.get ().end (); + + while (it != end && results.size () < count) + { + results.push_back (it->hash); + last = it->hash; + it = blocks.get ().upper_bound (last); + } + return results; +} + +bool nano::backlog_index::contains (nano::block_hash const & hash) const +{ + return blocks.get ().contains (hash); +} + +size_t nano::backlog_index::size () const +{ + return blocks.size (); +} + +size_t nano::backlog_index::size (nano::bucket_index bucket) const +{ + if (auto it = size_by_bucket.find (bucket); it != size_by_bucket.end ()) + { + return it->second; + } + return 0; +} + +nano::container_info nano::backlog_index::container_info () const +{ + auto collect_bucket_sizes = [&] () { + nano::container_info info; + for (auto [bucket, count] : size_by_bucket) + { + info.put (std::to_string (bucket), count); + } + return info; + }; + + nano::container_info info; + info.put ("blocks", blocks); + info.add ("sizes", collect_bucket_sizes ()); + return info; +} \ No newline at end of file diff --git a/nano/node/bounded_backlog.hpp b/nano/node/bounded_backlog.hpp new file mode 100644 index 0000000000..179c086622 --- /dev/null +++ b/nano/node/bounded_backlog.hpp @@ -0,0 +1,155 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace mi = boost::multi_index; + +namespace nano +{ +class backlog_index +{ +public: + struct priority_key + { + nano::bucket_index bucket; + nano::priority_timestamp priority; + + auto operator<=> (priority_key const &) const = default; + }; + + struct entry + { + nano::block_hash hash; + nano::account account; + nano::bucket_index bucket; + nano::priority_timestamp priority; + + backlog_index::priority_key priority_key () const + { + return { bucket, priority }; + } + }; + +public: + backlog_index () = default; + + bool insert (nano::block const & block, nano::bucket_index, nano::priority_timestamp); + + bool erase (nano::account const & account); + bool erase (nano::block_hash const & hash); + + using filter_callback = std::function; + std::deque top (nano::bucket_index, size_t count, filter_callback const &) const; + + std::deque next (nano::block_hash last, size_t count) const; + + bool contains (nano::block_hash const & hash) const; + size_t size () const; + size_t size (nano::bucket_index) const; + + nano::container_info container_info () const; + +private: + // clang-format off + class tag_hash {}; + class tag_hash_ordered {}; + class tag_account {}; + class tag_priority {}; + + using ordered_blocks = boost::multi_index_container, // Allows for fast lookup + mi::member>, + mi::ordered_unique, // Allows for sequential scan + mi::member>, + mi::hashed_non_unique, + mi::member>, + mi::ordered_non_unique, + mi::const_mem_fun, std::greater<>> // DESC order + >>; + // clang-format on + + ordered_blocks blocks; + + // Keep track of the size of the backlog in number of unconfirmed blocks per bucket + std::map size_by_bucket; +}; + +class bounded_backlog_config +{ +public: + size_t max_backlog{ 100000 }; + size_t bucket_threshold{ 1000 }; + double overfill_factor{ 1.5 }; + size_t batch_size{ 32 }; + size_t max_queued_notifications{ 128 }; +}; + +class bounded_backlog +{ +public: + bounded_backlog (bounded_backlog_config const &, nano::node &, nano::ledger &, nano::bucketing &, nano::backlog_scan &, nano::block_processor &, nano::confirming_set &, nano::stats &, nano::logger &); + ~bounded_backlog (); + + void start (); + void stop (); + + size_t index_size () const; + + nano::container_info container_info () const; + +private: // Dependencies + bounded_backlog_config const & config; + nano::node & node; + nano::ledger & ledger; + nano::bucketing & bucketing; + nano::backlog_scan & backlog_scan; + nano::block_processor & block_processor; + nano::confirming_set & confirming_set; + nano::stats & stats; + nano::logger & logger; + +private: + void activate (nano::secure::transaction &, nano::account const &, nano::account_info const &, nano::confirmation_height_info const &); + void update (nano::secure::transaction const &, nano::block_hash const &); + bool erase (nano::secure::transaction const &, nano::account const &); + bool insert (nano::secure::transaction const &, nano::block const &); + + bool predicate () const; + void run (); + std::deque gather_targets (size_t max_count) const; + bool should_rollback (nano::block_hash const &) const; + + std::deque perform_rollbacks (std::deque const & targets); + + void run_scan (); + +private: + nano::backlog_index index; + + nano::rate_limiter scan_limiter; + + std::atomic stopped{ false }; + nano::condition_variable condition; + mutable nano::mutex mutex; + std::thread thread; + std::thread scan_thread; + + nano::thread_pool workers; +}; +} \ No newline at end of file diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 414c3e6c3c..e477f8414a 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -9,7 +9,9 @@ namespace nano { class account_sets_config; class active_elections; +class backlog_scan; class block_processor; +class bounded_backlog; class bucketing; class bootstrap_config; class bootstrap_server; diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index b2a41a747f..31715309a7 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -103,6 +103,12 @@ void nano::local_block_broadcaster::stop () nano::join_or_pass (thread); } +bool nano::local_block_broadcaster::contains (nano::block_hash const & hash) const +{ + nano::lock_guard lock{ mutex }; + return local_blocks.get ().contains (hash); +} + size_t nano::local_block_broadcaster::size () const { nano::lock_guard lock{ mutex }; diff --git a/nano/node/local_block_broadcaster.hpp b/nano/node/local_block_broadcaster.hpp index a88ed25f95..567244491f 100644 --- a/nano/node/local_block_broadcaster.hpp +++ b/nano/node/local_block_broadcaster.hpp @@ -59,6 +59,7 @@ class local_block_broadcaster final void start (); void stop (); + bool contains (nano::block_hash const &) const; size_t size () const; nano::container_info container_info () const; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 22ae40f352..15b41c794d 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -159,6 +160,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy wallets (wallets_store.init_error (), *this), backlog_scan_impl{ std::make_unique (config.backlog_scan, ledger, stats) }, backlog_scan{ *backlog_scan_impl }, + backlog_impl{ std::make_unique (config.backlog, *this, ledger, bucketing, backlog_scan, block_processor, confirming_set, stats, logger) }, + backlog{ *backlog_impl }, bootstrap_server_impl{ std::make_unique (config.bootstrap_server, store, ledger, network_params.network, stats) }, bootstrap_server{ *bootstrap_server_impl }, bootstrap_impl{ std::make_unique (config, block_processor, ledger, network, stats, logger) }, @@ -651,6 +654,7 @@ void nano::node::start () scheduler.start (); aggregator.start (); backlog_scan.start (); + backlog.start (); bootstrap_server.start (); bootstrap.start (); websocket.start (); @@ -683,6 +687,7 @@ void nano::node::stop () distributed_work.stop (); backlog_scan.stop (); bootstrap.stop (); + backlog.stop (); rep_crawler.stop (); unchecked.stop (); block_processor.stop (); @@ -1211,6 +1216,7 @@ nano::container_info nano::node::container_info () const info.add ("message_processor", message_processor.container_info ()); info.add ("bandwidth", outbound_limiter.container_info ()); info.add ("backlog_scan", backlog_scan.container_info ()); + info.add ("bounded_backlog", backlog.container_info ()); return info; } diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 8ade202c4c..56c6727802 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -33,7 +33,6 @@ namespace nano { class active_elections; -class backlog_scan; class bandwidth_limiter; class confirming_set; class message_processor; @@ -203,6 +202,8 @@ class node final : public std::enable_shared_from_this nano::wallets wallets; std::unique_ptr backlog_scan_impl; nano::backlog_scan & backlog_scan; + std::unique_ptr backlog_impl; + nano::bounded_backlog & backlog; std::unique_ptr bootstrap_server_impl; nano::bootstrap_server & bootstrap_server; std::unique_ptr bootstrap_impl; diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index ed4ca6fa3e..5fb3e0d8d7 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -147,6 +148,7 @@ class node_config nano::confirming_set_config confirming_set; nano::monitor_config monitor; nano::backlog_scan_config backlog_scan; + nano::bounded_backlog_config backlog; public: /** Entry is ignored if it cannot be parsed as a valid address:port */ diff --git a/nano/secure/ledger.cpp b/nano/secure/ledger.cpp index 6d1bbc9b9b..60d39ab1c3 100644 --- a/nano/secure/ledger.cpp +++ b/nano/secure/ledger.cpp @@ -792,6 +792,11 @@ void nano::ledger::initialize (nano::generate_cache_flags const & generate_cache cache.pruned_count = store.pruned.count (transaction); } +bool nano::ledger::unconfirmed_exists (secure::transaction const & transaction, nano::block_hash const & hash) +{ + return any.block_exists (transaction, hash) && !confirmed.block_exists (transaction, hash); +} + nano::uint128_t nano::ledger::account_receivable (secure::transaction const & transaction_a, nano::account const & account_a, bool only_confirmed_a) { nano::uint128_t result (0); @@ -1537,6 +1542,13 @@ uint64_t nano::ledger::pruned_count () const return cache.pruned_count; } +uint64_t nano::ledger::backlog_count () const +{ + auto blocks = cache.block_count.load (); + auto cemented = cache.cemented_count.load (); + return (blocks > cemented) ? blocks - cemented : 0; +} + nano::container_info nano::ledger::container_info () const { nano::container_info info; diff --git a/nano/secure/ledger.hpp b/nano/secure/ledger.hpp index c9a987be72..91e1b8155d 100644 --- a/nano/secure/ledger.hpp +++ b/nano/secure/ledger.hpp @@ -43,6 +43,7 @@ class ledger final /** Start read-only transaction */ secure::read_transaction tx_begin_read () const; + bool unconfirmed_exists (secure::transaction const &, nano::block_hash const &); nano::uint128_t account_receivable (secure::transaction const &, nano::account const &, bool = false); /** * Returns the cached vote weight for the given representative. @@ -83,6 +84,7 @@ class ledger final uint64_t block_count () const; uint64_t account_count () const; uint64_t pruned_count () const; + uint64_t backlog_count () const; // Returned priority balance is maximum of block balance and previous block balance to handle full account balance send cases // Returned timestamp is the previous block timestamp or the current timestamp if there's no previous block diff --git a/nano/store/write_queue.hpp b/nano/store/write_queue.hpp index 0171685b87..0c395212c4 100644 --- a/nano/store/write_queue.hpp +++ b/nano/store/write_queue.hpp @@ -17,6 +17,7 @@ enum class writer confirmation_height, pruning, voting_final, + bounded_backlog, testing // Used in tests to emulate a write lock }; From 1c6594aa2779bb76909eaa285b4ce2c204471450 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 26 Nov 2024 19:52:32 +0100 Subject: [PATCH 05/11] Improve `backlog_index::next ()` --- nano/node/bounded_backlog.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/nano/node/bounded_backlog.cpp b/nano/node/bounded_backlog.cpp index 094cf219a9..71f6536749 100644 --- a/nano/node/bounded_backlog.cpp +++ b/nano/node/bounded_backlog.cpp @@ -485,11 +485,9 @@ std::deque nano::backlog_index::next (nano::block_hash last, s auto it = blocks.get ().upper_bound (last); auto end = blocks.get ().end (); - while (it != end && results.size () < count) + for (; it != end && results.size () < count; ++it) { results.push_back (it->hash); - last = it->hash; - it = blocks.get ().upper_bound (last); } return results; } From 560f5e8124274f2106573b19b68acec5341e3302 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 26 Nov 2024 19:57:28 +0100 Subject: [PATCH 06/11] Early return if target rollback count reached --- nano/node/bounded_backlog.cpp | 10 ++++++++-- nano/node/bounded_backlog.hpp | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/nano/node/bounded_backlog.cpp b/nano/node/bounded_backlog.cpp index 71f6536749..970a91896c 100644 --- a/nano/node/bounded_backlog.cpp +++ b/nano/node/bounded_backlog.cpp @@ -228,7 +228,7 @@ void nano::bounded_backlog::run () lock.unlock (); stats.add (nano::stat::type::bounded_backlog, nano::stat::detail::gathered_targets, targets.size ()); - auto processed = perform_rollbacks (targets); + auto processed = perform_rollbacks (targets, target_count); lock.lock (); @@ -285,7 +285,7 @@ bool nano::bounded_backlog::should_rollback (nano::block_hash const & hash) cons return true; } -std::deque nano::bounded_backlog::perform_rollbacks (std::deque const & targets) +std::deque nano::bounded_backlog::perform_rollbacks (std::deque const & targets, size_t max_rollbacks) { stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::performing_rollbacks); @@ -320,6 +320,12 @@ std::deque nano::bounded_backlog::perform_rollbacks (std::dequ // TODO: Calling block_processor's event here is not ideal, but duplicating these events is even worse block_processor.rolled_back.notify (rollback_list, root); }); + + // Return early if we reached the maximum number of rollbacks + if (processed.size () >= max_rollbacks) + { + break; + } } else { diff --git a/nano/node/bounded_backlog.hpp b/nano/node/bounded_backlog.hpp index 179c086622..2d6981c190 100644 --- a/nano/node/bounded_backlog.hpp +++ b/nano/node/bounded_backlog.hpp @@ -135,7 +135,7 @@ class bounded_backlog std::deque gather_targets (size_t max_count) const; bool should_rollback (nano::block_hash const &) const; - std::deque perform_rollbacks (std::deque const & targets); + std::deque perform_rollbacks (std::deque const & targets, size_t max_rollbacks); void run_scan (); From 71f958a5ad93fdc60acfedc59505b34a3e3ab738 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 26 Nov 2024 19:59:49 +0100 Subject: [PATCH 07/11] Use unordered_map --- nano/node/bounded_backlog.hpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nano/node/bounded_backlog.hpp b/nano/node/bounded_backlog.hpp index 2d6981c190..4eb0cc16bf 100644 --- a/nano/node/bounded_backlog.hpp +++ b/nano/node/bounded_backlog.hpp @@ -17,6 +17,8 @@ #include #include +#include + namespace mi = boost::multi_index; namespace nano @@ -87,7 +89,7 @@ class backlog_index ordered_blocks blocks; // Keep track of the size of the backlog in number of unconfirmed blocks per bucket - std::map size_by_bucket; + std::unordered_map size_by_bucket; }; class bounded_backlog_config From 57c0b8e9f7736842dcd3a736608ea434c961e5d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 28 Nov 2024 21:11:57 +0100 Subject: [PATCH 08/11] Simplify main loop --- nano/node/bounded_backlog.cpp | 68 +++++++++++++++++------------------ 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/nano/node/bounded_backlog.cpp b/nano/node/bounded_backlog.cpp index 970a91896c..9aa39dee45 100644 --- a/nano/node/bounded_backlog.cpp +++ b/nano/node/bounded_backlog.cpp @@ -203,54 +203,54 @@ void nano::bounded_backlog::run () std::unique_lock lock{ mutex }; while (!stopped) { - if (predicate ()) + condition.wait_for (lock, 1s, [this] { + return stopped || predicate (); + }); + + if (stopped) + { + return; + } + + // Wait until all notification about the previous rollbacks are processed + while (workers.queued_tasks () >= config.max_queued_notifications) { - // Wait until all notification about the previous rollbacks are processed - while (workers.queued_tasks () >= config.max_queued_notifications) + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::cooldown); + condition.wait_for (lock, 100ms, [this] { return stopped.load (); }); + if (stopped) { - stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::cooldown); - condition.wait_for (lock, 100ms, [this] { return stopped.load (); }); - if (stopped) - { - return; - } + return; } + } - stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::loop); + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::loop); - // Calculate the number of targets to rollback - uint64_t const backlog = ledger.backlog_count (); - uint64_t const target_count = backlog > config.max_backlog ? backlog - config.max_backlog : 0; + // Calculate the number of targets to rollback + uint64_t const backlog = ledger.backlog_count (); + uint64_t const target_count = backlog > config.max_backlog ? backlog - config.max_backlog : 0; - auto targets = gather_targets (std::min (target_count, static_cast (config.batch_size))); - if (!targets.empty ()) - { - lock.unlock (); + auto targets = gather_targets (std::min (target_count, static_cast (config.batch_size))); + if (!targets.empty ()) + { + lock.unlock (); - stats.add (nano::stat::type::bounded_backlog, nano::stat::detail::gathered_targets, targets.size ()); - auto processed = perform_rollbacks (targets, target_count); + stats.add (nano::stat::type::bounded_backlog, nano::stat::detail::gathered_targets, targets.size ()); + auto processed = perform_rollbacks (targets, target_count); - lock.lock (); + lock.lock (); - // Erase rolled back blocks from the index - for (auto const & hash : processed) - { - index.erase (hash); - } - } - else + // Erase rolled back blocks from the index + for (auto const & hash : processed) { - // Cooldown, this should not happen in normal operation - stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::no_targets); - condition.wait_for (lock, 100ms, [this] { - return stopped.load (); - }); + index.erase (hash); } } else { - condition.wait_for (lock, 1s, [this] { - return stopped || predicate (); + // Cooldown, this should not happen in normal operation + stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::no_targets); + condition.wait_for (lock, 100ms, [this] { + return stopped.load (); }); } } From a1d7cdc47c29a89c71e228522c6a12c2ae3d5a66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 28 Nov 2024 21:15:16 +0100 Subject: [PATCH 09/11] Dynamic bucket threshold --- nano/core_test/node.cpp | 1 - nano/node/bounded_backlog.cpp | 7 ++++++- nano/node/bounded_backlog.hpp | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 593c5ee6c1..297c1d2f1c 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -3706,7 +3706,6 @@ TEST (node, bounded_backlog) nano::node_config node_config; node_config.backlog.max_backlog = 10; - node_config.backlog.bucket_threshold = 2; node_config.backlog_scan.enable = false; auto & node = *system.add_node (node_config); diff --git a/nano/node/bounded_backlog.cpp b/nano/node/bounded_backlog.cpp index 9aa39dee45..62b604ecdd 100644 --- a/nano/node/bounded_backlog.cpp +++ b/nano/node/bounded_backlog.cpp @@ -337,6 +337,11 @@ std::deque nano::bounded_backlog::perform_rollbacks (std::dequ return processed; } +size_t nano::bounded_backlog::bucket_threshold () const +{ + return config.max_backlog / bucketing.size (); +} + std::deque nano::bounded_backlog::gather_targets (size_t max_count) const { debug_assert (!mutex.try_lock ()); @@ -347,7 +352,7 @@ std::deque nano::bounded_backlog::gather_targets (size_t max_c for (auto bucket : bucketing.bucket_indices ()) { // Only start rolling back if the bucket is over the threshold of unconfirmed blocks - if (index.size (bucket) > config.bucket_threshold) + if (index.size (bucket) > bucket_threshold ()) { auto const count = std::min (max_count, config.batch_size); diff --git a/nano/node/bounded_backlog.hpp b/nano/node/bounded_backlog.hpp index 4eb0cc16bf..5935e7a59f 100644 --- a/nano/node/bounded_backlog.hpp +++ b/nano/node/bounded_backlog.hpp @@ -96,7 +96,6 @@ class bounded_backlog_config { public: size_t max_backlog{ 100000 }; - size_t bucket_threshold{ 1000 }; double overfill_factor{ 1.5 }; size_t batch_size{ 32 }; size_t max_queued_notifications{ 128 }; @@ -112,6 +111,7 @@ class bounded_backlog void stop (); size_t index_size () const; + size_t bucket_threshold () const; nano::container_info container_info () const; From 94ef4613c616cad59e3fb3e0a7b6c82c61da5ba0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 28 Nov 2024 21:52:03 +0100 Subject: [PATCH 10/11] Config --- nano/core_test/node.cpp | 2 +- nano/core_test/toml.cpp | 288 ++++++++++++++++++---------------- nano/node/bounded_backlog.cpp | 43 ++++- nano/node/bounded_backlog.hpp | 12 +- nano/node/node.cpp | 2 +- nano/node/nodeconfig.cpp | 12 ++ nano/node/nodeconfig.hpp | 5 +- 7 files changed, 217 insertions(+), 147 deletions(-) diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 297c1d2f1c..c260d5dbe2 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -3705,7 +3705,7 @@ TEST (node, bounded_backlog) nano::test::system system; nano::node_config node_config; - node_config.backlog.max_backlog = 10; + node_config.max_backlog = 10; node_config.backlog_scan.enable = false; auto & node = *system.add_node (node_config); diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index 9d6e2124ab..cb635aecd1 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -67,7 +67,125 @@ TEST (toml, diff_equal) ASSERT_TRUE (other.empty ()); } -TEST (toml, daemon_config_update_array) +TEST (toml, optional_child) +{ + std::stringstream ss; + ss << R"toml( + [child] + val=1 + )toml"; + + nano::tomlconfig t; + t.read (ss); + auto c1 = t.get_required_child ("child"); + int val = 0; + c1.get_required ("val", val); + ASSERT_EQ (val, 1); + auto c2 = t.get_optional_child ("child2"); + ASSERT_FALSE (c2); +} + +/** Config settings passed via CLI overrides the config file settings. This is solved +using an override stream. */ +TEST (toml, dot_child_syntax) +{ + std::stringstream ss_override; + ss_override << R"toml( + node.a = 1 + node.b = 2 + )toml"; + + std::stringstream ss; + ss << R"toml( + [node] + b=5 + c=3 + )toml"; + + nano::tomlconfig t; + t.read (ss_override, ss); + + auto node = t.get_required_child ("node"); + uint16_t a, b, c; + node.get ("a", a); + ASSERT_EQ (a, 1); + node.get ("b", b); + ASSERT_EQ (b, 2); + node.get ("c", c); + ASSERT_EQ (c, 3); +} + +TEST (toml, base_override) +{ + std::stringstream ss_base; + ss_base << R"toml( + node.peering_port=7075 + )toml"; + + std::stringstream ss_override; + ss_override << R"toml( + node.peering_port=8075 + node.too_big=70000 + )toml"; + + nano::tomlconfig t; + t.read (ss_override, ss_base); + + // Query optional existent value + uint16_t port = 0; + t.get_optional ("node.peering_port", port); + ASSERT_EQ (port, 8075); + ASSERT_FALSE (t.get_error ()); + + // Query optional non-existent value, make sure we get default and no errors + port = 65535; + t.get_optional ("node.peering_port_non_existent", port); + ASSERT_EQ (port, 65535); + ASSERT_FALSE (t.get_error ()); + t.get_error ().clear (); + + // Query required non-existent value, make sure it errors + t.get_required ("node.peering_port_not_existent", port); + ASSERT_EQ (port, 65535); + ASSERT_TRUE (t.get_error ()); + ASSERT_EQ (t.get_error (), nano::error_config::missing_value); + t.get_error ().clear (); + + // Query uint16 that's too big, make sure we have an error + t.get_required ("node.too_big", port); + ASSERT_TRUE (t.get_error ()); + ASSERT_EQ (t.get_error (), nano::error_config::invalid_value); +} + +TEST (toml, put) +{ + nano::tomlconfig config; + nano::tomlconfig config_node; + // Overwrite value and add to child node + config_node.put ("port", "7074"); + config_node.put ("port", "7075"); + config.put_child ("node", config_node); + uint16_t port; + config.get_required ("node.port", port); + ASSERT_EQ (port, 7075); + ASSERT_FALSE (config.get_error ()); +} + +TEST (toml, array) +{ + nano::tomlconfig config; + nano::tomlconfig config_node; + config.put_child ("node", config_node); + config_node.push ("items", "item 1"); + config_node.push ("items", "item 2"); + int i = 1; + config_node.array_entries_required ("items", [&i] (std::string item) { + ASSERT_EQ (item, std::string ("item ") + std::to_string (i)); + i++; + }); +} + +TEST (toml_config, daemon_config_update_array) { nano::tomlconfig t; std::filesystem::path data_path ("."); @@ -79,7 +197,7 @@ TEST (toml, daemon_config_update_array) } /** Empty rpc config file should match a default config object */ -TEST (toml, rpc_config_deserialize_defaults) +TEST (toml_config, rpc_config_deserialize_defaults) { std::stringstream ss; @@ -111,12 +229,13 @@ TEST (toml, rpc_config_deserialize_defaults) } /** Empty config file should match a default config object */ -TEST (toml, daemon_config_deserialize_defaults) +TEST (toml_config, daemon_config_deserialize_defaults) { std::stringstream ss; ss << R"toml( [node] [node.backlog_scan] + [node.bounded_backlog] [node.bootstrap] [node.bootstrap_server] [node.block_processor] @@ -197,10 +316,17 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.max_queued_requests, defaults.node.max_queued_requests); ASSERT_EQ (conf.node.request_aggregator_threads, defaults.node.request_aggregator_threads); ASSERT_EQ (conf.node.max_unchecked_blocks, defaults.node.max_unchecked_blocks); + ASSERT_EQ (conf.node.max_backlog, defaults.node.max_backlog); + ASSERT_EQ (conf.node.enable_upnp, defaults.node.enable_upnp); + ASSERT_EQ (conf.node.backlog_scan.enable, defaults.node.backlog_scan.enable); ASSERT_EQ (conf.node.backlog_scan.batch_size, defaults.node.backlog_scan.batch_size); ASSERT_EQ (conf.node.backlog_scan.rate_limit, defaults.node.backlog_scan.rate_limit); - ASSERT_EQ (conf.node.enable_upnp, defaults.node.enable_upnp); + + ASSERT_EQ (conf.node.bounded_backlog.enable, defaults.node.bounded_backlog.enable); + ASSERT_EQ (conf.node.bounded_backlog.batch_size, defaults.node.bounded_backlog.batch_size); + ASSERT_EQ (conf.node.bounded_backlog.max_queued_notifications, defaults.node.bounded_backlog.max_queued_notifications); + ASSERT_EQ (conf.node.bounded_backlog.scan_rate, defaults.node.bounded_backlog.scan_rate); ASSERT_EQ (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled); ASSERT_EQ (conf.node.websocket_config.address, defaults.node.websocket_config.address); @@ -294,126 +420,8 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.message_processor.max_queue, defaults.node.message_processor.max_queue); } -TEST (toml, optional_child) -{ - std::stringstream ss; - ss << R"toml( - [child] - val=1 - )toml"; - - nano::tomlconfig t; - t.read (ss); - auto c1 = t.get_required_child ("child"); - int val = 0; - c1.get_required ("val", val); - ASSERT_EQ (val, 1); - auto c2 = t.get_optional_child ("child2"); - ASSERT_FALSE (c2); -} - -/** Config settings passed via CLI overrides the config file settings. This is solved -using an override stream. */ -TEST (toml, dot_child_syntax) -{ - std::stringstream ss_override; - ss_override << R"toml( - node.a = 1 - node.b = 2 - )toml"; - - std::stringstream ss; - ss << R"toml( - [node] - b=5 - c=3 - )toml"; - - nano::tomlconfig t; - t.read (ss_override, ss); - - auto node = t.get_required_child ("node"); - uint16_t a, b, c; - node.get ("a", a); - ASSERT_EQ (a, 1); - node.get ("b", b); - ASSERT_EQ (b, 2); - node.get ("c", c); - ASSERT_EQ (c, 3); -} - -TEST (toml, base_override) -{ - std::stringstream ss_base; - ss_base << R"toml( - node.peering_port=7075 - )toml"; - - std::stringstream ss_override; - ss_override << R"toml( - node.peering_port=8075 - node.too_big=70000 - )toml"; - - nano::tomlconfig t; - t.read (ss_override, ss_base); - - // Query optional existent value - uint16_t port = 0; - t.get_optional ("node.peering_port", port); - ASSERT_EQ (port, 8075); - ASSERT_FALSE (t.get_error ()); - - // Query optional non-existent value, make sure we get default and no errors - port = 65535; - t.get_optional ("node.peering_port_non_existent", port); - ASSERT_EQ (port, 65535); - ASSERT_FALSE (t.get_error ()); - t.get_error ().clear (); - - // Query required non-existent value, make sure it errors - t.get_required ("node.peering_port_not_existent", port); - ASSERT_EQ (port, 65535); - ASSERT_TRUE (t.get_error ()); - ASSERT_EQ (t.get_error (), nano::error_config::missing_value); - t.get_error ().clear (); - - // Query uint16 that's too big, make sure we have an error - t.get_required ("node.too_big", port); - ASSERT_TRUE (t.get_error ()); - ASSERT_EQ (t.get_error (), nano::error_config::invalid_value); -} - -TEST (toml, put) -{ - nano::tomlconfig config; - nano::tomlconfig config_node; - // Overwrite value and add to child node - config_node.put ("port", "7074"); - config_node.put ("port", "7075"); - config.put_child ("node", config_node); - uint16_t port; - config.get_required ("node.port", port); - ASSERT_EQ (port, 7075); - ASSERT_FALSE (config.get_error ()); -} - -TEST (toml, array) -{ - nano::tomlconfig config; - nano::tomlconfig config_node; - config.put_child ("node", config_node); - config_node.push ("items", "item 1"); - config_node.push ("items", "item 2"); - int i = 1; - config_node.array_entries_required ("items", [&i] (std::string item) { - ASSERT_EQ (item, std::string ("item ") + std::to_string (i)); - i++; - }); -} - /** Deserialize a node config with non-default values */ -TEST (toml, daemon_config_deserialize_no_defaults) +TEST (toml_config, daemon_config_deserialize_no_defaults) { std::stringstream ss; @@ -462,6 +470,7 @@ TEST (toml, daemon_config_deserialize_no_defaults) max_queued_requests = 999 request_aggregator_threads = 999 max_unchecked_blocks = 999 + max_backlog = 999 frontiers_confirmation = "always" enable_upnp = false @@ -470,6 +479,12 @@ TEST (toml, daemon_config_deserialize_no_defaults) batch_size = 999 rate_limit = 999 + [node.bounded_backlog] + enable = false + batch_size = 999 + max_queued_notifications = 999 + scan_rate = 999 + [node.block_processor] max_peer_queue = 999 max_system_queue = 999 @@ -679,6 +694,7 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.io_threads, defaults.node.io_threads); ASSERT_NE (conf.node.max_work_generate_multiplier, defaults.node.max_work_generate_multiplier); ASSERT_NE (conf.node.max_unchecked_blocks, defaults.node.max_unchecked_blocks); + ASSERT_NE (conf.node.max_backlog, defaults.node.max_backlog); ASSERT_NE (conf.node.network_threads, defaults.node.network_threads); ASSERT_NE (conf.node.background_threads, defaults.node.background_threads); ASSERT_NE (conf.node.secondary_work_peers, defaults.node.secondary_work_peers); @@ -704,10 +720,16 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.work_threads, defaults.node.work_threads); ASSERT_NE (conf.node.max_queued_requests, defaults.node.max_queued_requests); ASSERT_NE (conf.node.request_aggregator_threads, defaults.node.request_aggregator_threads); + ASSERT_NE (conf.node.enable_upnp, defaults.node.enable_upnp); + ASSERT_NE (conf.node.backlog_scan.enable, defaults.node.backlog_scan.enable); ASSERT_NE (conf.node.backlog_scan.batch_size, defaults.node.backlog_scan.batch_size); ASSERT_NE (conf.node.backlog_scan.rate_limit, defaults.node.backlog_scan.rate_limit); - ASSERT_NE (conf.node.enable_upnp, defaults.node.enable_upnp); + + ASSERT_NE (conf.node.bounded_backlog.enable, defaults.node.bounded_backlog.enable); + ASSERT_NE (conf.node.bounded_backlog.batch_size, defaults.node.bounded_backlog.batch_size); + ASSERT_NE (conf.node.bounded_backlog.max_queued_notifications, defaults.node.bounded_backlog.max_queued_notifications); + ASSERT_NE (conf.node.bounded_backlog.scan_rate, defaults.node.bounded_backlog.scan_rate); ASSERT_NE (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled); ASSERT_NE (conf.node.websocket_config.address, defaults.node.websocket_config.address); @@ -804,7 +826,7 @@ TEST (toml, daemon_config_deserialize_no_defaults) } /** There should be no required values **/ -TEST (toml, daemon_config_no_required) +TEST (toml_config, daemon_config_no_required) { std::stringstream ss; @@ -835,7 +857,7 @@ TEST (toml, daemon_config_no_required) } /** Deserialize an rpc config with non-default values */ -TEST (toml, rpc_config_deserialize_no_defaults) +TEST (toml_config, rpc_config_deserialize_no_defaults) { std::stringstream ss; @@ -878,7 +900,7 @@ TEST (toml, rpc_config_deserialize_no_defaults) } /** There should be no required values **/ -TEST (toml, rpc_config_no_required) +TEST (toml_config, rpc_config_no_required) { std::stringstream ss; @@ -900,7 +922,7 @@ TEST (toml, rpc_config_no_required) } /** Deserialize a node config with incorrect values */ -TEST (toml, daemon_config_deserialize_errors) +TEST (toml_config, daemon_config_deserialize_errors) { { std::stringstream ss; @@ -932,7 +954,7 @@ TEST (toml, daemon_config_deserialize_errors) } } -TEST (toml, daemon_read_config) +TEST (toml_config, daemon_read_config) { auto path (nano::unique_path ()); std::filesystem::create_directories (path); @@ -976,7 +998,7 @@ TEST (toml, daemon_read_config) } } -TEST (toml, log_config_defaults) +TEST (toml_config, log_config_defaults) { std::stringstream ss; @@ -1002,7 +1024,7 @@ TEST (toml, log_config_defaults) ASSERT_EQ (confg.file.rotation_count, defaults.file.rotation_count); } -TEST (toml, log_config_no_defaults) +TEST (toml_config, log_config_no_defaults) { std::stringstream ss; @@ -1044,7 +1066,7 @@ TEST (toml, log_config_no_defaults) ASSERT_NE (confg.file.rotation_count, defaults.file.rotation_count); } -TEST (toml, log_config_no_required) +TEST (toml_config, log_config_no_required) { std::stringstream ss; @@ -1065,7 +1087,7 @@ TEST (toml, log_config_no_required) ASSERT_FALSE (toml.get_error ()) << toml.get_error ().get_message (); } -TEST (toml, merge_config_files) +TEST (toml_config, merge_config_files) { nano::network_params network_params{ nano::network_constants::active_network }; nano::tomlconfig default_toml; diff --git a/nano/node/bounded_backlog.cpp b/nano/node/bounded_backlog.cpp index 62b604ecdd..ce2f6dc153 100644 --- a/nano/node/bounded_backlog.cpp +++ b/nano/node/bounded_backlog.cpp @@ -12,7 +12,7 @@ #include #include -nano::bounded_backlog::bounded_backlog (nano::bounded_backlog_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::bucketing & bucketing_a, nano::backlog_scan & backlog_scan_a, nano::block_processor & block_processor_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a) : +nano::bounded_backlog::bounded_backlog (nano::node_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::bucketing & bucketing_a, nano::backlog_scan & backlog_scan_a, nano::block_processor & block_processor_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a) : config{ config_a }, node{ node_a }, ledger{ ledger_a }, @@ -22,7 +22,7 @@ nano::bounded_backlog::bounded_backlog (nano::bounded_backlog_config const & con confirming_set{ confirming_set_a }, stats{ stats_a }, logger{ logger_a }, - scan_limiter{ config.batch_size }, + scan_limiter{ config.bounded_backlog.scan_rate }, workers{ 1, nano::thread_role::name::bounded_backlog_notifications } { // Activate accounts with unconfirmed blocks @@ -90,6 +90,11 @@ void nano::bounded_backlog::start () { debug_assert (!thread.joinable ()); + if (!config.bounded_backlog.enable) + { + return; + } + workers.start (); thread = std::thread{ [this] () { @@ -213,7 +218,7 @@ void nano::bounded_backlog::run () } // Wait until all notification about the previous rollbacks are processed - while (workers.queued_tasks () >= config.max_queued_notifications) + while (workers.queued_tasks () >= config.bounded_backlog.max_queued_notifications) { stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::cooldown); condition.wait_for (lock, 100ms, [this] { return stopped.load (); }); @@ -229,7 +234,7 @@ void nano::bounded_backlog::run () uint64_t const backlog = ledger.backlog_count (); uint64_t const target_count = backlog > config.max_backlog ? backlog - config.max_backlog : 0; - auto targets = gather_targets (std::min (target_count, static_cast (config.batch_size))); + auto targets = gather_targets (std::min (target_count, static_cast (config.bounded_backlog.batch_size))); if (!targets.empty ()) { lock.unlock (); @@ -354,7 +359,7 @@ std::deque nano::bounded_backlog::gather_targets (size_t max_c // Only start rolling back if the bucket is over the threshold of unconfirmed blocks if (index.size (bucket) > bucket_threshold ()) { - auto const count = std::min (max_count, config.batch_size); + auto const count = std::min (max_count, config.bounded_backlog.batch_size); auto const top = index.top (bucket, count, [this] (auto const & hash) { // Only rollback if the block is not being used by the node @@ -390,11 +395,11 @@ void nano::bounded_backlog::run_scan () nano::block_hash last = 0; while (!stopped) { - wait (config.batch_size); + wait (config.bounded_backlog.batch_size); stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::loop_scan); - auto batch = index.next (last, config.batch_size); + auto batch = index.next (last, config.bounded_backlog.batch_size); if (batch.empty ()) // If batch is empty, we iterated over all accounts in the index { break; @@ -537,4 +542,28 @@ nano::container_info nano::backlog_index::container_info () const info.put ("blocks", blocks); info.add ("sizes", collect_bucket_sizes ()); return info; +} + +/* + * bounded_backlog_config + */ + +nano::error nano::bounded_backlog_config::serialize (nano::tomlconfig & toml) const +{ + toml.put ("enable", enable, "Enable the bounded backlog. \ntype:bool"); + toml.put ("batch_size", batch_size, "Maximum number of blocks to rollback per iteration. \ntype:uint64"); + toml.put ("max_queued_notifications", max_queued_notifications, "Maximum number of queued background tasks before cooldown. \ntype:uint64"); + toml.put ("scan_rate", scan_rate, "Rate limit for refreshing the backlog index. \ntype:uint64"); + + return toml.get_error (); +} + +nano::error nano::bounded_backlog_config::deserialize (nano::tomlconfig & toml) +{ + toml.get ("enable", enable); + toml.get ("batch_size", batch_size); + toml.get ("max_queued_notifications", max_queued_notifications); + toml.get ("scan_rate", scan_rate); + + return toml.get_error (); } \ No newline at end of file diff --git a/nano/node/bounded_backlog.hpp b/nano/node/bounded_backlog.hpp index 5935e7a59f..b75e68377f 100644 --- a/nano/node/bounded_backlog.hpp +++ b/nano/node/bounded_backlog.hpp @@ -95,16 +95,20 @@ class backlog_index class bounded_backlog_config { public: - size_t max_backlog{ 100000 }; - double overfill_factor{ 1.5 }; + nano::error deserialize (nano::tomlconfig &); + nano::error serialize (nano::tomlconfig &) const; + +public: + bool enable{ true }; size_t batch_size{ 32 }; size_t max_queued_notifications{ 128 }; + size_t scan_rate{ 64 }; }; class bounded_backlog { public: - bounded_backlog (bounded_backlog_config const &, nano::node &, nano::ledger &, nano::bucketing &, nano::backlog_scan &, nano::block_processor &, nano::confirming_set &, nano::stats &, nano::logger &); + bounded_backlog (nano::node_config const &, nano::node &, nano::ledger &, nano::bucketing &, nano::backlog_scan &, nano::block_processor &, nano::confirming_set &, nano::stats &, nano::logger &); ~bounded_backlog (); void start (); @@ -116,7 +120,7 @@ class bounded_backlog nano::container_info container_info () const; private: // Dependencies - bounded_backlog_config const & config; + nano::node_config const & config; nano::node & node; nano::ledger & ledger; nano::bucketing & bucketing; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 15b41c794d..5e87808671 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -160,7 +160,7 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy wallets (wallets_store.init_error (), *this), backlog_scan_impl{ std::make_unique (config.backlog_scan, ledger, stats) }, backlog_scan{ *backlog_scan_impl }, - backlog_impl{ std::make_unique (config.backlog, *this, ledger, bucketing, backlog_scan, block_processor, confirming_set, stats, logger) }, + backlog_impl{ std::make_unique (config, *this, ledger, bucketing, backlog_scan, block_processor, confirming_set, stats, logger) }, backlog{ *backlog_impl }, bootstrap_server_impl{ std::make_unique (config.bootstrap_server, store, ledger, network_params.network, stats) }, bootstrap_server{ *bootstrap_server_impl }, diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 8fb095afd8..f6ccdf563f 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -140,6 +140,7 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const toml.put ("max_queued_requests", max_queued_requests, "Limit for number of queued confirmation requests for one channel, after which new requests are dropped until the queue drops below this value.\ntype:uint32"); toml.put ("request_aggregator_threads", request_aggregator_threads, "Number of threads to dedicate to request aggregator. Defaults to using all cpu threads, up to a maximum of 4"); toml.put ("max_unchecked_blocks", max_unchecked_blocks, "Maximum number of unchecked blocks to store in memory. Defaults to 65536. \ntype:uint64,[0..]"); + toml.put ("max_backlog", max_backlog, "Maximum number of unconfirmed blocks to keep in the ledger. If this limit is exceeded, the node will start dropping low-priority unconfirmed blocks.\ntype:uint64"); toml.put ("rep_crawler_weight_minimum", rep_crawler_weight_minimum.to_string_dec (), "Rep crawler minimum weight, if this is less than minimum principal weight then this is taken as the minimum weight a rep must have to be tracked. If you want to track all reps set this to 0. If you do not want this to influence anything then set it to max value. This is only useful for debugging or for people who really know what they are doing.\ntype:string,amount,raw"); toml.put ("enable_upnp", enable_upnp, "Enable or disable automatic UPnP port forwarding. This feature only works if the node is directly connected to a router (not inside a docker container, etc.).\ntype:bool"); @@ -262,6 +263,10 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const backlog_scan.serialize (backlog_scan_l); toml.put_child ("backlog_scan", backlog_scan_l); + nano::tomlconfig bounded_backlog_l; + bounded_backlog.serialize (bounded_backlog_l); + toml.put_child ("bounded_backlog", bounded_backlog_l); + return toml.get_error (); } @@ -401,6 +406,12 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) backlog_scan.deserialize (config_l); } + if (toml.has_key ("bounded_backlog")) + { + auto config_l = toml.get_required_child ("bounded_backlog"); + bounded_backlog.deserialize (config_l); + } + /* * Values */ @@ -552,6 +563,7 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) toml.get ("request_aggregator_threads", request_aggregator_threads); toml.get ("max_unchecked_blocks", max_unchecked_blocks); + toml.get ("max_backlog", max_backlog); auto rep_crawler_weight_minimum_l (rep_crawler_weight_minimum.to_string_dec ()); if (toml.has_key ("rep_crawler_weight_minimum")) diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 5fb3e0d8d7..4871b98711 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -129,11 +129,14 @@ class node_config uint32_t max_queued_requests{ 512 }; unsigned request_aggregator_threads{ std::min (nano::hardware_concurrency (), 4u) }; // Max 4 threads if available unsigned max_unchecked_blocks{ 65536 }; + std::size_t max_backlog{ 100000 }; std::chrono::seconds max_pruning_age{ !network_params.network.is_beta_network () ? std::chrono::seconds (24 * 60 * 60) : std::chrono::seconds (5 * 60) }; // 1 day; 5 minutes for beta network uint64_t max_pruning_depth{ 0 }; nano::rocksdb_config rocksdb_config; nano::lmdb_config lmdb_config; bool enable_upnp{ true }; + +public: nano::vote_cache_config vote_cache; nano::rep_crawler_config rep_crawler; nano::block_processor_config block_processor; @@ -148,7 +151,7 @@ class node_config nano::confirming_set_config confirming_set; nano::monitor_config monitor; nano::backlog_scan_config backlog_scan; - nano::bounded_backlog_config backlog; + nano::bounded_backlog_config bounded_backlog; public: /** Entry is ignored if it cannot be parsed as a valid address:port */ From 88465ac96934ab6ed3d94710851d97e19b3f4fe6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 28 Nov 2024 21:55:36 +0100 Subject: [PATCH 11/11] Cleanup --- nano/node/bounded_backlog.cpp | 17 ++++++----------- nano/node/bounded_backlog.hpp | 2 +- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/nano/node/bounded_backlog.cpp b/nano/node/bounded_backlog.cpp index ce2f6dc153..4a800a60d3 100644 --- a/nano/node/bounded_backlog.cpp +++ b/nano/node/bounded_backlog.cpp @@ -132,21 +132,10 @@ size_t nano::bounded_backlog::index_size () const return index.size (); } -bool nano::bounded_backlog::erase (nano::secure::transaction const & transaction, nano::account const & account) -{ - nano::lock_guard guard{ mutex }; - return index.erase (account); -} - void nano::bounded_backlog::activate (nano::secure::transaction & transaction, nano::account const & account, nano::account_info const & account_info, nano::confirmation_height_info const & conf_info) { debug_assert (conf_info.frontier != account_info.head); - auto contains = [this] (nano::block_hash const & hash) { - nano::lock_guard guard{ mutex }; - return index.contains (hash); - }; - // Insert blocks into the index starting from the account head block auto block = ledger.any.block_get (transaction, account_info.head); while (block) @@ -420,6 +409,12 @@ void nano::bounded_backlog::run_scan () } } +bool nano::bounded_backlog::contains (nano::block_hash const & hash) const +{ + nano::lock_guard guard{ mutex }; + return index.contains (hash); +} + nano::container_info nano::bounded_backlog::container_info () const { nano::lock_guard guard{ mutex }; diff --git a/nano/node/bounded_backlog.hpp b/nano/node/bounded_backlog.hpp index b75e68377f..807bfe69d6 100644 --- a/nano/node/bounded_backlog.hpp +++ b/nano/node/bounded_backlog.hpp @@ -116,6 +116,7 @@ class bounded_backlog size_t index_size () const; size_t bucket_threshold () const; + bool contains (nano::block_hash const &) const; nano::container_info container_info () const; @@ -133,7 +134,6 @@ class bounded_backlog private: void activate (nano::secure::transaction &, nano::account const &, nano::account_info const &, nano::confirmation_height_info const &); void update (nano::secure::transaction const &, nano::block_hash const &); - bool erase (nano::secure::transaction const &, nano::account const &); bool insert (nano::secure::transaction const &, nano::block const &); bool predicate () const;