Skip to content

Commit

Permalink
Merge branch 'blockprocessor-batch-size' into performance-integration
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Jun 2, 2024
2 parents b9e3ffa + 704e2ab commit 5385d25
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 27 deletions.
63 changes: 36 additions & 27 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,13 @@ void nano::block_processor::run ()
{
if (!queue.empty ())
{
lock.unlock ();
// TODO: Cleaner periodical logging
if (should_log ())
{
node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
queue.size (),
queue.size ({ nano::block_source::forced }));
}

auto processed = process_batch (lock);
debug_assert (!lock.owns_lock ());
Expand Down Expand Up @@ -291,42 +297,49 @@ auto nano::block_processor::next () -> context
release_assert (false, "next() called when no blocks are ready");
}

auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock_a) -> processed_batch_t
auto nano::block_processor::next_batch (size_t max_count) -> std::deque<context>
{
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ());

queue.periodic_update ();

std::deque<context> results;
while (!queue.empty () && results.size () < max_count)
{
results.push_back (next ());
}
return results;
}

auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock) -> processed_batch_t
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ());

auto batch = next_batch (256);

lock.unlock ();

processed_batch_t processed;

// TODO: Properly limiting batch times requires this <guard, transaction> combo to be wrapped in a single object that provides refresh functionality
auto scoped_write_guard = node.store.write_queue.wait (nano::store::writer::process_batch);
auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights });
nano::timer<std::chrono::milliseconds> timer_l;

lock_a.lock ();

queue.periodic_update ();

nano::timer<std::chrono::milliseconds> timer_l;
timer_l.start ();

// Processing blocks
unsigned number_of_blocks_processed (0), number_of_forced_processed (0);
auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); };
auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; };
auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; };
size_t number_of_blocks_processed = 0;
size_t number_of_forced_processed = 0;

while (!queue.empty () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ())
for (auto & ctx : batch)
{
// TODO: Cleaner periodical logging
if (should_log ())
{
node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
queue.size (),
queue.size ({ nano::block_source::forced }));
}

auto ctx = next ();
auto const hash = ctx.block->hash ();
bool const force = ctx.source == nano::block_source::forced;

lock_a.unlock ();

if (force)
{
number_of_forced_processed++;
Expand All @@ -337,12 +350,8 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock

auto result = process_one (transaction, ctx, force);
processed.emplace_back (result, std::move (ctx));

lock_a.lock ();
}

lock_a.unlock ();

if (number_of_blocks_processed != 0 && timer_l.stop () > std::chrono::milliseconds (100))
{
node.logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer_l.value ().count (), timer_l.unit ());
Expand Down
1 change: 1 addition & 0 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class block_processor final
nano::block_status process_one (secure::write_transaction const &, context const &, bool forced = false);
void queue_unchecked (secure::write_transaction const &, nano::hash_or_account const &);
processed_batch_t process_batch (nano::unique_lock<nano::mutex> &);
std::deque<context> next_batch (size_t max_count);
context next ();
bool add_impl (context, std::shared_ptr<nano::transport::channel> const & channel = nullptr);

Expand Down

0 comments on commit 5385d25

Please sign in to comment.