From 968092351d95ea25550cd84fa484db2026176604 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 27 Oct 2024 11:20:58 +0100 Subject: [PATCH 1/5] Modify `observer_set` to only accept and pass const ref arguments --- nano/lib/observer_set.hpp | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/nano/lib/observer_set.hpp b/nano/lib/observer_set.hpp index cc4e02e9ca..5630515fa6 100644 --- a/nano/lib/observer_set.hpp +++ b/nano/lib/observer_set.hpp @@ -12,21 +12,25 @@ template class observer_set final { public: - void add (std::function const & observer_a) + using observer_type = std::function; + +public: + void add (observer_type observer) { nano::lock_guard lock{ mutex }; - observers.push_back (observer_a); + observers.push_back (observer); } - void notify (T... args) const + void notify (T const &... args) const { + // Make observers copy to allow parallel notifications nano::unique_lock lock{ mutex }; auto observers_copy = observers; lock.unlock (); - for (auto & i : observers_copy) + for (auto const & observer : observers_copy) { - i (args...); + observer (args...); } } @@ -53,7 +57,7 @@ class observer_set final private: mutable nano::mutex mutex{ mutex_identifier (mutexes::observer_set) }; - std::vector> observers; + std::vector observers; }; } From dd668ad1880e5b1d90b16f0db429384ec77e4771 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 27 Oct 2024 11:35:03 +0100 Subject: [PATCH 2/5] Tests --- nano/core_test/CMakeLists.txt | 1 + nano/core_test/observer_set.cpp | 129 ++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 nano/core_test/observer_set.cpp diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 1d01f6e796..84b3ec16d8 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -37,6 +37,7 @@ add_executable( node.cpp numbers.cpp object_stream.cpp + observer_set.cpp optimistic_scheduler.cpp processing_queue.cpp processor_service.cpp diff --git a/nano/core_test/observer_set.cpp b/nano/core_test/observer_set.cpp new file mode 100644 index 0000000000..14c6d1301c --- /dev/null +++ b/nano/core_test/observer_set.cpp @@ -0,0 +1,129 @@ +#include +#include + +#include + +#include +#include + +using namespace std::chrono_literals; + +TEST (observer_set, notify_one) +{ + nano::observer_set set; + int value{ 0 }; + set.add ([&value] (int v) { + value = v; + }); + set.notify (1); + ASSERT_EQ (1, value); +} + +TEST (observer_set, notify_multiple) +{ + nano::observer_set set; + int value{ 0 }; + set.add ([&value] (int v) { + value = v; + }); + set.add ([&value] (int v) { + value += v; + }); + set.notify (1); + ASSERT_EQ (2, value); +} + +TEST (observer_set, notify_empty) +{ + nano::observer_set set; + set.notify (1); +} + +TEST (observer_set, notify_multiple_types) +{ + nano::observer_set set; + int value{ 0 }; + std::string str; + set.add ([&value, &str] (int v, std::string s) { + value = v; + str = s; + }); + set.notify (1, "test"); + ASSERT_EQ (1, value); + ASSERT_EQ ("test", str); +} + +TEST (observer_set, empty_params) +{ + nano::observer_set<> set; + set.notify (); +} + +// Make sure there are no TSAN warnings +TEST (observer_set, parallel_notify) +{ + nano::observer_set set; + std::atomic value{ 0 }; + set.add ([&value] (int v) { + std::this_thread::sleep_for (100ms); + value = v; + }); + nano::timer timer{ nano::timer_state::started }; + std::vector threads; + for (int i = 0; i < 10; ++i) + { + threads.emplace_back ([&set] { + set.notify (1); + }); + } + for (auto & thread : threads) + { + thread.join (); + } + ASSERT_EQ (1, value); + // Notification should be done in parallel + ASSERT_LT (timer.since_start (), 300ms); +} + +namespace +{ +struct move_only +{ + move_only () = default; + move_only (move_only &&) = default; + move_only & operator= (move_only &&) = default; + move_only (move_only const &) = delete; + move_only & operator= (move_only const &) = delete; +}; + +struct copy_throw +{ + copy_throw () = default; + copy_throw (copy_throw &&) = default; + copy_throw & operator= (copy_throw &&) = default; + copy_throw (copy_throw const &) + { + throw std::runtime_error ("copy_throw"); + } + copy_throw & operator= (copy_throw const &) = delete; +}; +} + +// Ensure that parameters are not unnecessarily copied, this should compile +TEST (observer_set, move_only) +{ + nano::observer_set set; + set.add ([] (move_only const &) { + }); + move_only value; + set.notify (value); +} + +TEST (observer_set, copy_throw) +{ + nano::observer_set set; + set.add ([] (copy_throw const &) { + }); + copy_throw value; + ASSERT_NO_THROW (set.notify (value)); +} \ No newline at end of file From 1fce19b3e92bc4077ca9896f5ed51aa48071ae83 Mon Sep 17 00:00:00 2001 From: Colin LeMahieu Date: Sun, 27 Oct 2024 11:50:03 +0000 Subject: [PATCH 3/5] Fixing leak where cursor would not be closed before being overwritten. --- nano/store/lmdb/iterator.cpp | 21 +++++++-------------- nano/store/lmdb/iterator.hpp | 4 +--- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/nano/store/lmdb/iterator.cpp b/nano/store/lmdb/iterator.cpp index 76cfc35e5b..5fc87d1d09 100644 --- a/nano/store/lmdb/iterator.cpp +++ b/nano/store/lmdb/iterator.cpp @@ -21,7 +21,7 @@ void iterator::update (int status) if (status == MDB_SUCCESS) { value_type init; - auto status = mdb_cursor_get (cursor, &init.first, &init.second, MDB_GET_CURRENT); + auto status = mdb_cursor_get (cursor.get (), &init.first, &init.second, MDB_GET_CURRENT); release_assert (status == MDB_SUCCESS); current = init; } @@ -33,8 +33,10 @@ void iterator::update (int status) iterator::iterator (MDB_txn * tx, MDB_dbi dbi) noexcept { + MDB_cursor * cursor; auto open_status = mdb_cursor_open (tx, dbi, &cursor); release_assert (open_status == MDB_SUCCESS); + this->cursor.reset (cursor); this->current = std::monostate{}; } @@ -53,7 +55,7 @@ auto iterator::end (MDB_txn * tx, MDB_dbi dbi) -> iterator auto iterator::lower_bound (MDB_txn * tx, MDB_dbi dbi, MDB_val const & lower_bound) -> iterator { iterator result{ tx, dbi }; - auto status = mdb_cursor_get (result.cursor, const_cast (&lower_bound), nullptr, MDB_SET_RANGE); + auto status = mdb_cursor_get (result.cursor.get (), const_cast (&lower_bound), nullptr, MDB_SET_RANGE); result.update (status); return std::move (result); } @@ -63,18 +65,9 @@ iterator::iterator (iterator && other) noexcept *this = std::move (other); } -iterator::~iterator () -{ - if (cursor) - { - mdb_cursor_close (cursor); - } -} - auto iterator::operator= (iterator && other) noexcept -> iterator & { - cursor = other.cursor; - other.cursor = nullptr; + cursor = std::move (other.cursor); current = other.current; other.current = std::monostate{}; return *this; @@ -83,7 +76,7 @@ auto iterator::operator= (iterator && other) noexcept -> iterator & auto iterator::operator++ () -> iterator & { auto operation = is_end () ? MDB_FIRST : MDB_NEXT; - auto status = mdb_cursor_get (cursor, nullptr, nullptr, operation); + auto status = mdb_cursor_get (cursor.get (), nullptr, nullptr, operation); release_assert (status == MDB_SUCCESS || status == MDB_NOTFOUND); update (status); return *this; @@ -92,7 +85,7 @@ auto iterator::operator++ () -> iterator & auto iterator::operator-- () -> iterator & { auto operation = is_end () ? MDB_LAST : MDB_PREV; - auto status = mdb_cursor_get (cursor, nullptr, nullptr, operation); + auto status = mdb_cursor_get (cursor.get (), nullptr, nullptr, operation); release_assert (status == MDB_SUCCESS || status == MDB_NOTFOUND); update (status); return *this; diff --git a/nano/store/lmdb/iterator.hpp b/nano/store/lmdb/iterator.hpp index ff921ca3aa..da2280adf3 100644 --- a/nano/store/lmdb/iterator.hpp +++ b/nano/store/lmdb/iterator.hpp @@ -22,7 +22,7 @@ namespace nano::store::lmdb */ class iterator { - MDB_cursor * cursor{ nullptr }; + std::unique_ptr cursor{ nullptr, mdb_cursor_close }; std::variant> current; void update (int status); iterator (MDB_txn * tx, MDB_dbi dbi) noexcept; @@ -39,8 +39,6 @@ class iterator static auto end (MDB_txn * tx, MDB_dbi dbi) -> iterator; static auto lower_bound (MDB_txn * tx, MDB_dbi dbi, MDB_val const & lower_bound) -> iterator; - ~iterator (); - iterator (iterator const &) = delete; auto operator= (iterator const &) -> iterator & = delete; From 5ca6ff70dcdc0ede4eb054f7e7f1dde366e0bf53 Mon Sep 17 00:00:00 2001 From: Colin LeMahieu Date: Sun, 27 Oct 2024 14:05:06 +0000 Subject: [PATCH 4/5] Making use of unique_ptr to ensure environment handle never leaks. --- nano/store/lmdb/lmdb.cpp | 8 +++----- nano/store/lmdb/lmdb_env.cpp | 9 ++++----- nano/store/lmdb/lmdb_env.hpp | 2 +- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/nano/store/lmdb/lmdb.cpp b/nano/store/lmdb/lmdb.cpp index a16d53cb88..07bbc34ded 100644 --- a/nano/store/lmdb/lmdb.cpp +++ b/nano/store/lmdb/lmdb.cpp @@ -121,9 +121,7 @@ bool nano::store::lmdb::component::vacuum_after_upgrade (std::filesystem::path c if (vacuum_success) { // Need to close the database to release the file handle - mdb_env_sync (env.environment, true); - mdb_env_close (env.environment); - env.environment = nullptr; + mdb_env_sync (env, true); // Replace the ledger file with the vacuumed one std::filesystem::rename (vacuum_path, path_a); @@ -155,7 +153,7 @@ void nano::store::lmdb::component::serialize_mdb_tracker (boost::property_tree:: void nano::store::lmdb::component::serialize_memory_stats (boost::property_tree::ptree & json) { MDB_stat stats; - auto status (mdb_env_stat (env.environment, &stats)); + auto status (mdb_env_stat (env, &stats)); release_assert (status == 0); json.put ("branch_pages", stats.ms_branch_pages); json.put ("depth", stats.ms_depth); @@ -448,7 +446,7 @@ std::string nano::store::lmdb::component::error_string (int status) const bool nano::store::lmdb::component::copy_db (std::filesystem::path const & destination_file) { - return !mdb_env_copy2 (env.environment, destination_file.string ().c_str (), MDB_CP_COMPACT); + return !mdb_env_copy2 (env, destination_file.string ().c_str (), MDB_CP_COMPACT); } void nano::store::lmdb::component::rebuild_db (store::write_transaction const & transaction_a) diff --git a/nano/store/lmdb/lmdb_env.cpp b/nano/store/lmdb/lmdb_env.cpp index cd1897c80a..b30718960f 100644 --- a/nano/store/lmdb/lmdb_env.cpp +++ b/nano/store/lmdb/lmdb_env.cpp @@ -19,8 +19,10 @@ void nano::store::lmdb::env::init (bool & error_a, std::filesystem::path const & nano::set_secure_perm_directory (path_a.parent_path (), error_chmod); if (!error_mkdir) { + MDB_env * environment; auto status1 (mdb_env_create (&environment)); release_assert (status1 == 0); + this->environment.reset (environment); auto status2 (mdb_env_set_maxdbs (environment, options_a.config.max_databases)); release_assert (status2 == 0); auto map_size = options_a.config.map_size; @@ -66,13 +68,11 @@ void nano::store::lmdb::env::init (bool & error_a, std::filesystem::path const & else { error_a = true; - environment = nullptr; } } else { error_a = true; - environment = nullptr; } } @@ -81,14 +81,13 @@ nano::store::lmdb::env::~env () if (environment != nullptr) { // Make sure the commits are flushed. This is a no-op unless MDB_NOSYNC is used. - mdb_env_sync (environment, true); - mdb_env_close (environment); + mdb_env_sync (environment.get (), true); } } nano::store::lmdb::env::operator MDB_env * () const { - return environment; + return environment.get (); } nano::store::read_transaction nano::store::lmdb::env::tx_begin_read (store::lmdb::txn_callbacks mdb_txn_callbacks) const diff --git a/nano/store/lmdb/lmdb_env.hpp b/nano/store/lmdb/lmdb_env.hpp index e912653250..07e89ffd2e 100644 --- a/nano/store/lmdb/lmdb_env.hpp +++ b/nano/store/lmdb/lmdb_env.hpp @@ -62,7 +62,7 @@ class env final store::read_transaction tx_begin_read (txn_callbacks callbacks = txn_callbacks{}) const; store::write_transaction tx_begin_write (txn_callbacks callbacks = txn_callbacks{}) const; MDB_txn * tx (store::transaction const & transaction_a) const; - MDB_env * environment; + std::unique_ptr environment{ nullptr, mdb_env_close }; nano::id_t const store_id{ nano::next_id () }; }; } // namespace nano::store::lmdb From 104b74d2e573d4b995b0d31db64991c8d77436a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20W=C3=B3jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 28 Oct 2024 12:17:45 +0100 Subject: [PATCH 5/5] Offload block processor notifications (#4763) * Issue block processor batch processed notifications on background thread * Remove `should_log` function * Move `block_processor::context` source to the bottom of the file * Remove unnecessary notify --- nano/lib/thread_roles.cpp | 3 + nano/lib/thread_roles.hpp | 1 + nano/node/blockprocessor.cpp | 111 ++++++++++++---------- nano/node/blockprocessor.hpp | 9 +- nano/node/bootstrap_ascending/service.cpp | 10 +- nano/node/confirming_set.cpp | 13 +-- nano/node/confirming_set.hpp | 4 +- 7 files changed, 84 insertions(+), 67 deletions(-) diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 76f467cd0e..ff6831b4d8 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -37,6 +37,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::block_processing: thread_role_name_string = "Blck processing"; break; + case nano::thread_role::name::block_processing_notifications: + thread_role_name_string = "Blck proc notif"; + break; case nano::thread_role::name::request_loop: thread_role_name_string = "Request loop"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 5896318c42..a82a584727 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -17,6 +17,7 @@ enum class name vote_processing, vote_cache_processing, block_processing, + block_processing_notifications, request_loop, wallet_actions, bootstrap_initiator, diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index d0e8716494..34d6956a8b 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -12,36 +12,14 @@ #include -/* - * block_processor::context - */ - -nano::block_processor::context::context (std::shared_ptr block, nano::block_source source_a, callback_t callback_a) : - block{ std::move (block) }, - source{ source_a }, - callback{ std::move (callback_a) } -{ - debug_assert (source != nano::block_source::unknown); -} - -auto nano::block_processor::context::get_future () -> std::future -{ - return promise.get_future (); -} - -void nano::block_processor::context::set_result (result_t const & result) -{ - promise.set_value (result); -} - /* * block_processor */ nano::block_processor::block_processor (nano::node & node_a) : config{ node_a.config.block_processor }, - node (node_a), - next_log (std::chrono::steady_clock::now ()) + node{ node_a }, + workers{ 1, nano::thread_role::name::block_processing_notifications } { batch_processed.add ([this] (auto const & items) { // For every batch item: notify the 'processed' observer. @@ -84,12 +62,15 @@ nano::block_processor::~block_processor () { // Thread must be stopped before destruction debug_assert (!thread.joinable ()); + debug_assert (!workers.alive ()); } void nano::block_processor::start () { debug_assert (!thread.joinable ()); + workers.start (); + thread = std::thread ([this] () { nano::thread_role::set (nano::thread_role::name::block_processing); run (); @@ -107,6 +88,7 @@ void nano::block_processor::stop () { thread.join (); } + workers.stop (); } // TODO: Remove and replace all checks with calls to size (block_source) @@ -229,13 +211,24 @@ void nano::block_processor::rollback_competitor (secure::write_transaction const void nano::block_processor::run () { + nano::interval log_interval; nano::unique_lock lock{ mutex }; while (!stopped) { if (!queue.empty ()) { - // TODO: Cleaner periodical logging - if (should_log ()) + // It's possible that ledger processing happens faster than the notifications can be processed by other components, cooldown here + while (workers.queued_tasks () >= config.max_queued_notifications) + { + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::cooldown); + condition.wait_for (lock, 100ms, [this] { return stopped; }); + if (stopped) + { + return; + } + } + + if (log_interval.elapsed (15s)) { node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", queue.size (), @@ -244,41 +237,32 @@ void nano::block_processor::run () auto processed = process_batch (lock); debug_assert (!lock.owns_lock ()); + lock.lock (); - // Set results for futures when not holding the lock - for (auto & [result, context] : processed) - { - if (context.callback) + // Queue notifications to be dispatched in the background + workers.post ([this, processed = std::move (processed)] () mutable { + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::notify); + // Set results for futures when not holding the lock + for (auto & [result, context] : processed) { - context.callback (result); + if (context.callback) + { + context.callback (result); + } + context.set_result (result); } - context.set_result (result); - } - - batch_processed.notify (processed); - - lock.lock (); + batch_processed.notify (processed); + }); } else { - condition.notify_one (); - condition.wait (lock); + condition.wait (lock, [this] { + return stopped || !queue.empty (); + }); } } } -bool nano::block_processor::should_log () -{ - auto result (false); - auto now (std::chrono::steady_clock::now ()); - if (next_log < now) - { - next_log = now + std::chrono::seconds (15); - result = true; - } - return result; -} - auto nano::block_processor::next () -> context { debug_assert (!mutex.try_lock ()); @@ -315,7 +299,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock debug_assert (!mutex.try_lock ()); debug_assert (!queue.empty ()); - auto batch = next_batch (256); + auto batch = next_batch (config.batch_size); lock.unlock (); @@ -466,9 +450,32 @@ nano::container_info nano::block_processor::container_info () const info.put ("blocks", queue.size ()); info.put ("forced", queue.size ({ nano::block_source::forced })); info.add ("queue", queue.container_info ()); + info.add ("workers", workers.container_info ()); return info; } +/* + * block_processor::context + */ + +nano::block_processor::context::context (std::shared_ptr block, nano::block_source source_a, callback_t callback_a) : + block{ std::move (block) }, + source{ source_a }, + callback{ std::move (callback_a) } +{ + debug_assert (source != nano::block_source::unknown); +} + +auto nano::block_processor::context::get_future () -> std::future +{ + return promise.get_future (); +} + +void nano::block_processor::context::set_result (result_t const & result) +{ + promise.set_value (result); +} + /* * block_processor_config */ diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 837631b0ba..cb54711b0e 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -46,6 +47,9 @@ class block_processor_config final size_t priority_live{ 1 }; size_t priority_bootstrap{ 8 }; size_t priority_local{ 16 }; + + size_t batch_size{ 256 }; + size_t max_queued_notifications{ 8 }; }; /** @@ -89,7 +93,6 @@ class block_processor final bool add (std::shared_ptr const &, nano::block_source = nano::block_source::live, std::shared_ptr const & channel = nullptr, std::function callback = {}); std::optional add_blocking (std::shared_ptr const & block, nano::block_source); void force (std::shared_ptr const &); - bool should_log (); nano::container_info container_info () const; @@ -122,11 +125,11 @@ class block_processor final private: nano::fair_queue queue; - std::chrono::steady_clock::time_point next_log; - bool stopped{ false }; nano::condition_variable condition; mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; std::thread thread; + + nano::thread_pool workers; }; } diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index 661072b448..bb6f4183e1 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -33,7 +33,6 @@ nano::bootstrap_ascending::service::service (nano::node_config const & node_conf scoring{ config, node_config_a.network_params.network }, database_limiter{ config.database_rate_limit, 1.0 } { - // TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread block_processor.batch_processed.add ([this] (auto const & batch) { { nano::lock_guard lock{ mutex }; @@ -217,11 +216,14 @@ void nano::bootstrap_ascending::service::inspect (secure::transaction const & tx { if (source == nano::block_source::bootstrap) { - const auto account = block.previous ().is_zero () ? block.account_field ().value () : ledger.any.block_account (tx, block.previous ()).value (); + const auto account = block.previous ().is_zero () ? block.account_field ().value () : ledger.any.block_account (tx, block.previous ()).value_or (0); const auto source_hash = block.source_field ().value_or (block.link_field ().value_or (0).as_block_hash ()); - // Mark account as blocked because it is missing the source block - accounts.block (account, source_hash); + if (!account.is_zero () && !source_hash.is_zero ()) + { + // Mark account as blocked because it is missing the source block + accounts.block (account, source_hash); + } } } break; diff --git a/nano/node/confirming_set.cpp b/nano/node/confirming_set.cpp index e504b0acae..2921881b52 100644 --- a/nano/node/confirming_set.cpp +++ b/nano/node/confirming_set.cpp @@ -12,7 +12,7 @@ nano::confirming_set::confirming_set (confirming_set_config const & config_a, na ledger{ ledger_a }, stats{ stats_a }, logger{ logger_a }, - notification_workers{ 1, nano::thread_role::name::confirmation_height_notifications } + workers{ 1, nano::thread_role::name::confirmation_height_notifications } { batch_cemented.add ([this] (auto const & cemented) { for (auto const & context : cemented) @@ -55,7 +55,7 @@ void nano::confirming_set::start () return; } - notification_workers.start (); + workers.start (); thread = std::thread{ [this] () { nano::thread_role::set (nano::thread_role::name::confirmation_height); @@ -74,7 +74,7 @@ void nano::confirming_set::stop () { thread.join (); } - notification_workers.stop (); + workers.stop (); } bool nano::confirming_set::contains (nano::block_hash const & hash) const @@ -150,7 +150,7 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) std::unique_lock lock{ mutex }; // It's possible that ledger cementing happens faster than the notifications can be processed by other components, cooldown here - while (notification_workers.queued_tasks () >= config.max_queued_notifications) + while (workers.queued_tasks () >= config.max_queued_notifications) { stats.inc (nano::stat::type::confirming_set, nano::stat::detail::cooldown); condition.wait_for (lock, 100ms, [this] { return stopped.load (); }); @@ -160,7 +160,7 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) } } - notification_workers.post ([this, batch = std::move (batch)] () { + workers.post ([this, batch = std::move (batch)] () { stats.inc (nano::stat::type::confirming_set, nano::stat::detail::notify); batch_cemented.notify (batch); }); @@ -255,6 +255,7 @@ nano::container_info nano::confirming_set::container_info () const nano::container_info info; info.put ("set", set); - info.add ("notification_workers", notification_workers.container_info ()); + info.put ("notifications", workers.queued_tasks ()); + info.add ("workers", workers.container_info ()); return info; } diff --git a/nano/node/confirming_set.hpp b/nano/node/confirming_set.hpp index 99569ce1e6..644b241c92 100644 --- a/nano/node/confirming_set.hpp +++ b/nano/node/confirming_set.hpp @@ -105,11 +105,11 @@ class confirming_set final ordered_entries set; std::unordered_set current; - nano::thread_pool notification_workers; - std::atomic stopped{ false }; mutable std::mutex mutex; std::condition_variable condition; std::thread thread; + + nano::thread_pool workers; }; }