Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offload block processor notifications #4763

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::block_processing:
thread_role_name_string = "Blck processing";
break;
case nano::thread_role::name::block_processing_notifications:
thread_role_name_string = "Blck proc notif";
break;
case nano::thread_role::name::request_loop:
thread_role_name_string = "Request loop";
break;
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ enum class name
vote_processing,
vote_cache_processing,
block_processing,
block_processing_notifications,
request_loop,
wallet_actions,
bootstrap_initiator,
Expand Down
111 changes: 59 additions & 52 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,14 @@

#include <utility>

/*
* block_processor::context
*/

nano::block_processor::context::context (std::shared_ptr<nano::block> block, nano::block_source source_a, callback_t callback_a) :
block{ std::move (block) },
source{ source_a },
callback{ std::move (callback_a) }
{
debug_assert (source != nano::block_source::unknown);
}

auto nano::block_processor::context::get_future () -> std::future<result_t>
{
return promise.get_future ();
}

void nano::block_processor::context::set_result (result_t const & result)
{
promise.set_value (result);
}

/*
* block_processor
*/

nano::block_processor::block_processor (nano::node & node_a) :
config{ node_a.config.block_processor },
node (node_a),
next_log (std::chrono::steady_clock::now ())
node{ node_a },
workers{ 1, nano::thread_role::name::block_processing_notifications }
{
batch_processed.add ([this] (auto const & items) {
// For every batch item: notify the 'processed' observer.
Expand Down Expand Up @@ -84,12 +62,15 @@ nano::block_processor::~block_processor ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
debug_assert (!workers.alive ());
}

void nano::block_processor::start ()
{
debug_assert (!thread.joinable ());

workers.start ();

thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::block_processing);
run ();
Expand All @@ -107,6 +88,7 @@ void nano::block_processor::stop ()
{
thread.join ();
}
workers.stop ();
}

// TODO: Remove and replace all checks with calls to size (block_source)
Expand Down Expand Up @@ -229,13 +211,24 @@ void nano::block_processor::rollback_competitor (secure::write_transaction const

void nano::block_processor::run ()
{
nano::interval log_interval;
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
if (!queue.empty ())
{
// TODO: Cleaner periodical logging
if (should_log ())
// It's possible that ledger processing happens faster than the notifications can be processed by other components, cooldown here
while (workers.queued_tasks () >= config.max_queued_notifications)
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::cooldown);
condition.wait_for (lock, 100ms, [this] { return stopped; });
if (stopped)
{
return;
}
}

if (log_interval.elapsed (15s))
{
node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
queue.size (),
Expand All @@ -244,41 +237,32 @@ void nano::block_processor::run ()

auto processed = process_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();

// Set results for futures when not holding the lock
for (auto & [result, context] : processed)
{
if (context.callback)
// Queue notifications to be dispatched in the background
workers.post ([this, processed = std::move (processed)] () mutable {
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::notify);
// Set results for futures when not holding the lock
for (auto & [result, context] : processed)
{
context.callback (result);
if (context.callback)
{
context.callback (result);
}
context.set_result (result);
}
context.set_result (result);
}

batch_processed.notify (processed);

lock.lock ();
batch_processed.notify (processed);
});
}
else
{
condition.notify_one ();
condition.wait (lock);
condition.wait (lock, [this] {
return stopped || !queue.empty ();
});
}
}
}

bool nano::block_processor::should_log ()
{
auto result (false);
auto now (std::chrono::steady_clock::now ());
if (next_log < now)
{
next_log = now + std::chrono::seconds (15);
result = true;
}
return result;
}

auto nano::block_processor::next () -> context
{
debug_assert (!mutex.try_lock ());
Expand Down Expand Up @@ -315,7 +299,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ());

auto batch = next_batch (256);
auto batch = next_batch (config.batch_size);

lock.unlock ();

Expand Down Expand Up @@ -466,9 +450,32 @@ nano::container_info nano::block_processor::container_info () const
info.put ("blocks", queue.size ());
info.put ("forced", queue.size ({ nano::block_source::forced }));
info.add ("queue", queue.container_info ());
info.add ("workers", workers.container_info ());
return info;
}

/*
* block_processor::context
*/

nano::block_processor::context::context (std::shared_ptr<nano::block> block, nano::block_source source_a, callback_t callback_a) :
block{ std::move (block) },
source{ source_a },
callback{ std::move (callback_a) }
{
debug_assert (source != nano::block_source::unknown);
}

auto nano::block_processor::context::get_future () -> std::future<result_t>
{
return promise.get_future ();
}

void nano::block_processor::context::set_result (result_t const & result)
{
promise.set_value (result);
}

/*
* block_processor_config
*/
Expand Down
9 changes: 6 additions & 3 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <nano/lib/logging.hpp>
#include <nano/lib/thread_pool.hpp>
#include <nano/node/fair_queue.hpp>
#include <nano/node/fwd.hpp>
#include <nano/secure/common.hpp>
Expand Down Expand Up @@ -46,6 +47,9 @@ class block_processor_config final
size_t priority_live{ 1 };
size_t priority_bootstrap{ 8 };
size_t priority_local{ 16 };

size_t batch_size{ 256 };
size_t max_queued_notifications{ 8 };
};

/**
Expand Down Expand Up @@ -89,7 +93,6 @@ class block_processor final
bool add (std::shared_ptr<nano::block> const &, nano::block_source = nano::block_source::live, std::shared_ptr<nano::transport::channel> const & channel = nullptr, std::function<void (nano::block_status)> callback = {});
std::optional<nano::block_status> add_blocking (std::shared_ptr<nano::block> const & block, nano::block_source);
void force (std::shared_ptr<nano::block> const &);
bool should_log ();

nano::container_info container_info () const;

Expand Down Expand Up @@ -122,11 +125,11 @@ class block_processor final
private:
nano::fair_queue<context, nano::block_source> queue;

std::chrono::steady_clock::time_point next_log;

bool stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) };
std::thread thread;

nano::thread_pool workers;
};
}
10 changes: 6 additions & 4 deletions nano/node/bootstrap_ascending/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ nano::bootstrap_ascending::service::service (nano::node_config const & node_conf
scoring{ config, node_config_a.network_params.network },
database_limiter{ config.database_rate_limit, 1.0 }
{
// TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread
block_processor.batch_processed.add ([this] (auto const & batch) {
{
nano::lock_guard<nano::mutex> lock{ mutex };
Expand Down Expand Up @@ -217,11 +216,14 @@ void nano::bootstrap_ascending::service::inspect (secure::transaction const & tx
{
if (source == nano::block_source::bootstrap)
{
const auto account = block.previous ().is_zero () ? block.account_field ().value () : ledger.any.block_account (tx, block.previous ()).value ();
const auto account = block.previous ().is_zero () ? block.account_field ().value () : ledger.any.block_account (tx, block.previous ()).value_or (0);
const auto source_hash = block.source_field ().value_or (block.link_field ().value_or (0).as_block_hash ());

// Mark account as blocked because it is missing the source block
accounts.block (account, source_hash);
if (!account.is_zero () && !source_hash.is_zero ())
{
// Mark account as blocked because it is missing the source block
accounts.block (account, source_hash);
}
}
}
break;
Expand Down
13 changes: 7 additions & 6 deletions nano/node/confirming_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ nano::confirming_set::confirming_set (confirming_set_config const & config_a, na
ledger{ ledger_a },
stats{ stats_a },
logger{ logger_a },
notification_workers{ 1, nano::thread_role::name::confirmation_height_notifications }
workers{ 1, nano::thread_role::name::confirmation_height_notifications }
{
batch_cemented.add ([this] (auto const & cemented) {
for (auto const & context : cemented)
Expand Down Expand Up @@ -55,7 +55,7 @@ void nano::confirming_set::start ()
return;
}

notification_workers.start ();
workers.start ();

thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::confirmation_height);
Expand All @@ -74,7 +74,7 @@ void nano::confirming_set::stop ()
{
thread.join ();
}
notification_workers.stop ();
workers.stop ();
}

bool nano::confirming_set::contains (nano::block_hash const & hash) const
Expand Down Expand Up @@ -150,7 +150,7 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
std::unique_lock lock{ mutex };

// It's possible that ledger cementing happens faster than the notifications can be processed by other components, cooldown here
while (notification_workers.queued_tasks () >= config.max_queued_notifications)
while (workers.queued_tasks () >= config.max_queued_notifications)
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::cooldown);
condition.wait_for (lock, 100ms, [this] { return stopped.load (); });
Expand All @@ -160,7 +160,7 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
}
}

notification_workers.post ([this, batch = std::move (batch)] () {
workers.post ([this, batch = std::move (batch)] () {
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::notify);
batch_cemented.notify (batch);
});
Expand Down Expand Up @@ -255,6 +255,7 @@ nano::container_info nano::confirming_set::container_info () const

nano::container_info info;
info.put ("set", set);
info.add ("notification_workers", notification_workers.container_info ());
info.put ("notifications", workers.queued_tasks ());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this already added with workers.container_info() below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be redundant, right

info.add ("workers", workers.container_info ());
return info;
}
4 changes: 2 additions & 2 deletions nano/node/confirming_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ class confirming_set final
ordered_entries set;
std::unordered_set<nano::block_hash> current;

nano::thread_pool notification_workers;

std::atomic<bool> stopped{ false };
mutable std::mutex mutex;
std::condition_variable condition;
std::thread thread;

nano::thread_pool workers;
};
}
Loading