From 2a1b37996eb74ad7523a4fa87a071cfbe1686713 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 21 Oct 2024 16:04:55 +0200 Subject: [PATCH] Issue block processor batch processed notifications on background thread --- nano/lib/thread_roles.cpp | 3 ++ nano/lib/thread_roles.hpp | 1 + nano/node/blockprocessor.cpp | 40 +++++++++++++++++------ nano/node/blockprocessor.hpp | 6 ++++ nano/node/bootstrap_ascending/service.cpp | 10 +++--- nano/node/confirming_set.cpp | 13 ++++---- nano/node/confirming_set.hpp | 4 +-- 7 files changed, 55 insertions(+), 22 deletions(-) diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 76f467cd0e..ff6831b4d8 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -37,6 +37,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::block_processing: thread_role_name_string = "Blck processing"; break; + case nano::thread_role::name::block_processing_notifications: + thread_role_name_string = "Blck proc notif"; + break; case nano::thread_role::name::request_loop: thread_role_name_string = "Request loop"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 5896318c42..a82a584727 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -17,6 +17,7 @@ enum class name vote_processing, vote_cache_processing, block_processing, + block_processing_notifications, request_loop, wallet_actions, bootstrap_initiator, diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index d0e8716494..7ccc879a9d 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -41,7 +41,8 @@ void nano::block_processor::context::set_result (result_t const & result) nano::block_processor::block_processor (nano::node & node_a) : config{ node_a.config.block_processor }, node (node_a), - next_log (std::chrono::steady_clock::now ()) + next_log (std::chrono::steady_clock::now ()), + workers{ 1, nano::thread_role::name::block_processing_notifications } { batch_processed.add ([this] (auto const & items) { // For every batch item: notify the 'processed' observer. @@ -84,12 +85,15 @@ nano::block_processor::~block_processor () { // Thread must be stopped before destruction debug_assert (!thread.joinable ()); + debug_assert (!workers.alive ()); } void nano::block_processor::start () { debug_assert (!thread.joinable ()); + workers.start (); + thread = std::thread ([this] () { nano::thread_role::set (nano::thread_role::name::block_processing); run (); @@ -107,6 +111,7 @@ void nano::block_processor::stop () { thread.join (); } + workers.stop (); } // TODO: Remove and replace all checks with calls to size (block_source) @@ -244,20 +249,33 @@ void nano::block_processor::run () auto processed = process_batch (lock); debug_assert (!lock.owns_lock ()); + lock.lock (); - // Set results for futures when not holding the lock - for (auto & [result, context] : processed) + // It's possible that ledger processing happens faster than the notifications can be processed by other components, cooldown here + while (workers.queued_tasks () >= config.max_queued_notifications) { - if (context.callback) + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::cooldown); + condition.wait_for (lock, 100ms, [this] { return stopped; }); + if (stopped) { - context.callback (result); + return; } - context.set_result (result); } - batch_processed.notify (processed); - - lock.lock (); + // Queue notifications to be dispatched in the background + workers.post ([this, processed = std::move (processed)] () mutable { + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::notify); + // Set results for futures when not holding the lock + for (auto & [result, context] : processed) + { + if (context.callback) + { + context.callback (result); + } + context.set_result (result); + } + batch_processed.notify (processed); + }); } else { @@ -315,7 +333,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock debug_assert (!mutex.try_lock ()); debug_assert (!queue.empty ()); - auto batch = next_batch (256); + auto batch = next_batch (config.batch_size); lock.unlock (); @@ -465,7 +483,9 @@ nano::container_info nano::block_processor::container_info () const nano::container_info info; info.put ("blocks", queue.size ()); info.put ("forced", queue.size ({ nano::block_source::forced })); + info.put ("notifications", workers.queued_tasks ()); info.add ("queue", queue.container_info ()); + info.add ("workers", workers.container_info ()); return info; } diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 837631b0ba..6313e9bd51 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -46,6 +47,9 @@ class block_processor_config final size_t priority_live{ 1 }; size_t priority_bootstrap{ 8 }; size_t priority_local{ 16 }; + + size_t batch_size{ 256 }; + size_t max_queued_notifications{ 8 }; }; /** @@ -128,5 +132,7 @@ class block_processor final nano::condition_variable condition; mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; std::thread thread; + + nano::thread_pool workers; }; } diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index 776f0b3d8e..d14f17c520 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -33,7 +33,6 @@ nano::bootstrap_ascending::service::service (nano::node_config const & node_conf scoring{ config, node_config_a.network_params.network }, database_limiter{ config.database_rate_limit, 1.0 } { - // TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread block_processor.batch_processed.add ([this] (auto const & batch) { { nano::lock_guard lock{ mutex }; @@ -217,11 +216,14 @@ void nano::bootstrap_ascending::service::inspect (secure::transaction const & tx { if (source == nano::block_source::bootstrap) { - const auto account = block.previous ().is_zero () ? block.account_field ().value () : ledger.any.block_account (tx, block.previous ()).value (); + const auto account = block.previous ().is_zero () ? block.account_field ().value () : ledger.any.block_account (tx, block.previous ()).value_or (0); const auto source_hash = block.source_field ().value_or (block.link_field ().value_or (0).as_block_hash ()); - // Mark account as blocked because it is missing the source block - accounts.block (account, source_hash); + if (!account.is_zero () && !source_hash.is_zero ()) + { + // Mark account as blocked because it is missing the source block + accounts.block (account, source_hash); + } } } break; diff --git a/nano/node/confirming_set.cpp b/nano/node/confirming_set.cpp index e504b0acae..2921881b52 100644 --- a/nano/node/confirming_set.cpp +++ b/nano/node/confirming_set.cpp @@ -12,7 +12,7 @@ nano::confirming_set::confirming_set (confirming_set_config const & config_a, na ledger{ ledger_a }, stats{ stats_a }, logger{ logger_a }, - notification_workers{ 1, nano::thread_role::name::confirmation_height_notifications } + workers{ 1, nano::thread_role::name::confirmation_height_notifications } { batch_cemented.add ([this] (auto const & cemented) { for (auto const & context : cemented) @@ -55,7 +55,7 @@ void nano::confirming_set::start () return; } - notification_workers.start (); + workers.start (); thread = std::thread{ [this] () { nano::thread_role::set (nano::thread_role::name::confirmation_height); @@ -74,7 +74,7 @@ void nano::confirming_set::stop () { thread.join (); } - notification_workers.stop (); + workers.stop (); } bool nano::confirming_set::contains (nano::block_hash const & hash) const @@ -150,7 +150,7 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) std::unique_lock lock{ mutex }; // It's possible that ledger cementing happens faster than the notifications can be processed by other components, cooldown here - while (notification_workers.queued_tasks () >= config.max_queued_notifications) + while (workers.queued_tasks () >= config.max_queued_notifications) { stats.inc (nano::stat::type::confirming_set, nano::stat::detail::cooldown); condition.wait_for (lock, 100ms, [this] { return stopped.load (); }); @@ -160,7 +160,7 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) } } - notification_workers.post ([this, batch = std::move (batch)] () { + workers.post ([this, batch = std::move (batch)] () { stats.inc (nano::stat::type::confirming_set, nano::stat::detail::notify); batch_cemented.notify (batch); }); @@ -255,6 +255,7 @@ nano::container_info nano::confirming_set::container_info () const nano::container_info info; info.put ("set", set); - info.add ("notification_workers", notification_workers.container_info ()); + info.put ("notifications", workers.queued_tasks ()); + info.add ("workers", workers.container_info ()); return info; } diff --git a/nano/node/confirming_set.hpp b/nano/node/confirming_set.hpp index 99569ce1e6..644b241c92 100644 --- a/nano/node/confirming_set.hpp +++ b/nano/node/confirming_set.hpp @@ -105,11 +105,11 @@ class confirming_set final ordered_entries set; std::unordered_set current; - nano::thread_pool notification_workers; - std::atomic stopped{ false }; mutable std::mutex mutex; std::condition_variable condition; std::thread thread; + + nano::thread_pool workers; }; }