Skip to content

Commit

Permalink
Issue block processor batch processed notifications on background thread
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Oct 22, 2024
1 parent f53f1f8 commit 2a1b379
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 22 deletions.
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::block_processing:
thread_role_name_string = "Blck processing";
break;
case nano::thread_role::name::block_processing_notifications:
thread_role_name_string = "Blck proc notif";
break;
case nano::thread_role::name::request_loop:
thread_role_name_string = "Request loop";
break;
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ enum class name
vote_processing,
vote_cache_processing,
block_processing,
block_processing_notifications,
request_loop,
wallet_actions,
bootstrap_initiator,
Expand Down
40 changes: 30 additions & 10 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ void nano::block_processor::context::set_result (result_t const & result)
nano::block_processor::block_processor (nano::node & node_a) :
config{ node_a.config.block_processor },
node (node_a),
next_log (std::chrono::steady_clock::now ())
next_log (std::chrono::steady_clock::now ()),
workers{ 1, nano::thread_role::name::block_processing_notifications }
{
batch_processed.add ([this] (auto const & items) {
// For every batch item: notify the 'processed' observer.
Expand Down Expand Up @@ -84,12 +85,15 @@ nano::block_processor::~block_processor ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
debug_assert (!workers.alive ());
}

void nano::block_processor::start ()
{
debug_assert (!thread.joinable ());

workers.start ();

thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::block_processing);
run ();
Expand All @@ -107,6 +111,7 @@ void nano::block_processor::stop ()
{
thread.join ();
}
workers.stop ();
}

// TODO: Remove and replace all checks with calls to size (block_source)
Expand Down Expand Up @@ -244,20 +249,33 @@ void nano::block_processor::run ()

auto processed = process_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();

// Set results for futures when not holding the lock
for (auto & [result, context] : processed)
// It's possible that ledger processing happens faster than the notifications can be processed by other components, cooldown here
while (workers.queued_tasks () >= config.max_queued_notifications)
{
if (context.callback)
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::cooldown);
condition.wait_for (lock, 100ms, [this] { return stopped; });
if (stopped)
{
context.callback (result);
return;
}
context.set_result (result);
}

batch_processed.notify (processed);

lock.lock ();
// Queue notifications to be dispatched in the background
workers.post ([this, processed = std::move (processed)] () mutable {
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::notify);
// Set results for futures when not holding the lock
for (auto & [result, context] : processed)
{
if (context.callback)
{
context.callback (result);
}
context.set_result (result);
}
batch_processed.notify (processed);
});
}
else
{
Expand Down Expand Up @@ -315,7 +333,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ());

auto batch = next_batch (256);
auto batch = next_batch (config.batch_size);

lock.unlock ();

Expand Down Expand Up @@ -465,7 +483,9 @@ nano::container_info nano::block_processor::container_info () const
nano::container_info info;
info.put ("blocks", queue.size ());
info.put ("forced", queue.size ({ nano::block_source::forced }));
info.put ("notifications", workers.queued_tasks ());
info.add ("queue", queue.container_info ());
info.add ("workers", workers.container_info ());
return info;
}

Expand Down
6 changes: 6 additions & 0 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <nano/lib/logging.hpp>
#include <nano/lib/thread_pool.hpp>
#include <nano/node/fair_queue.hpp>
#include <nano/node/fwd.hpp>
#include <nano/secure/common.hpp>
Expand Down Expand Up @@ -46,6 +47,9 @@ class block_processor_config final
size_t priority_live{ 1 };
size_t priority_bootstrap{ 8 };
size_t priority_local{ 16 };

size_t batch_size{ 256 };
size_t max_queued_notifications{ 8 };
};

/**
Expand Down Expand Up @@ -128,5 +132,7 @@ class block_processor final
nano::condition_variable condition;
mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) };
std::thread thread;

nano::thread_pool workers;
};
}
10 changes: 6 additions & 4 deletions nano/node/bootstrap_ascending/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ nano::bootstrap_ascending::service::service (nano::node_config const & node_conf
scoring{ config, node_config_a.network_params.network },
database_limiter{ config.database_rate_limit, 1.0 }
{
// TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread
block_processor.batch_processed.add ([this] (auto const & batch) {
{
nano::lock_guard<nano::mutex> lock{ mutex };
Expand Down Expand Up @@ -217,11 +216,14 @@ void nano::bootstrap_ascending::service::inspect (secure::transaction const & tx
{
if (source == nano::block_source::bootstrap)
{
const auto account = block.previous ().is_zero () ? block.account_field ().value () : ledger.any.block_account (tx, block.previous ()).value ();
const auto account = block.previous ().is_zero () ? block.account_field ().value () : ledger.any.block_account (tx, block.previous ()).value_or (0);
const auto source_hash = block.source_field ().value_or (block.link_field ().value_or (0).as_block_hash ());

// Mark account as blocked because it is missing the source block
accounts.block (account, source_hash);
if (!account.is_zero () && !source_hash.is_zero ())
{
// Mark account as blocked because it is missing the source block
accounts.block (account, source_hash);
}
}
}
break;
Expand Down
13 changes: 7 additions & 6 deletions nano/node/confirming_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ nano::confirming_set::confirming_set (confirming_set_config const & config_a, na
ledger{ ledger_a },
stats{ stats_a },
logger{ logger_a },
notification_workers{ 1, nano::thread_role::name::confirmation_height_notifications }
workers{ 1, nano::thread_role::name::confirmation_height_notifications }
{
batch_cemented.add ([this] (auto const & cemented) {
for (auto const & context : cemented)
Expand Down Expand Up @@ -55,7 +55,7 @@ void nano::confirming_set::start ()
return;
}

notification_workers.start ();
workers.start ();

thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::confirmation_height);
Expand All @@ -74,7 +74,7 @@ void nano::confirming_set::stop ()
{
thread.join ();
}
notification_workers.stop ();
workers.stop ();
}

bool nano::confirming_set::contains (nano::block_hash const & hash) const
Expand Down Expand Up @@ -150,7 +150,7 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
std::unique_lock lock{ mutex };

// It's possible that ledger cementing happens faster than the notifications can be processed by other components, cooldown here
while (notification_workers.queued_tasks () >= config.max_queued_notifications)
while (workers.queued_tasks () >= config.max_queued_notifications)
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::cooldown);
condition.wait_for (lock, 100ms, [this] { return stopped.load (); });
Expand All @@ -160,7 +160,7 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
}
}

notification_workers.post ([this, batch = std::move (batch)] () {
workers.post ([this, batch = std::move (batch)] () {
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::notify);
batch_cemented.notify (batch);
});
Expand Down Expand Up @@ -255,6 +255,7 @@ nano::container_info nano::confirming_set::container_info () const

nano::container_info info;
info.put ("set", set);
info.add ("notification_workers", notification_workers.container_info ());
info.put ("notifications", workers.queued_tasks ());
info.add ("workers", workers.container_info ());
return info;
}
4 changes: 2 additions & 2 deletions nano/node/confirming_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ class confirming_set final
ordered_entries set;
std::unordered_set<nano::block_hash> current;

nano::thread_pool notification_workers;

std::atomic<bool> stopped{ false };
mutable std::mutex mutex;
std::condition_variable condition;
std::thread thread;

nano::thread_pool workers;
};
}

0 comments on commit 2a1b379

Please sign in to comment.