diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index a3133a725f..4fb8c69cdc 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2647,53 +2647,6 @@ TEST (node, block_processor_full) ASSERT_TIMELY (5s, node.block_processor.full ()); } -TEST (node, block_processor_half_full) -{ - nano::test::system system; - nano::node_flags node_flags; - node_flags.block_processor_full_size = 6; - node_flags.force_use_write_queue = true; - auto & node = *system.add_node (nano::node_config (system.get_available_port ()), node_flags); - nano::state_block_builder builder; - auto send1 = builder.make_block () - .account (nano::dev::genesis_key.pub) - .previous (nano::dev::genesis->hash ()) - .representative (nano::dev::genesis_key.pub) - .balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio) - .link (nano::dev::genesis_key.pub) - .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) - .work (*node.work_generate_blocking (nano::dev::genesis->hash ())) - .build (); - auto send2 = builder.make_block () - .account (nano::dev::genesis_key.pub) - .previous (send1->hash ()) - .representative (nano::dev::genesis_key.pub) - .balance (nano::dev::constants.genesis_amount - 2 * nano::Gxrb_ratio) - .link (nano::dev::genesis_key.pub) - .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) - .work (*node.work_generate_blocking (send1->hash ())) - .build (); - auto send3 = builder.make_block () - .account (nano::dev::genesis_key.pub) - .previous (send2->hash ()) - .representative (nano::dev::genesis_key.pub) - .balance (nano::dev::constants.genesis_amount - 3 * nano::Gxrb_ratio) - .link (nano::dev::genesis_key.pub) - .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) - .work (*node.work_generate_blocking (send2->hash ())) - .build (); - // The write guard prevents block processor doing any writes - auto write_guard = node.store.write_queue.wait (nano::store::writer::testing); - node.block_processor.add (send1); - ASSERT_FALSE (node.block_processor.half_full ()); - node.block_processor.add (send2); - ASSERT_FALSE (node.block_processor.half_full ()); - node.block_processor.add (send3); - // Block processor may be not half_full during state blocks signatures verification - ASSERT_TIMELY (2s, node.block_processor.half_full ()); - ASSERT_FALSE (node.block_processor.full ()); -} - TEST (node, confirm_back) { nano::test::system system (1); diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index aa8e6bc49a..ade835b2e2 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -241,7 +241,13 @@ void nano::block_processor::run () { if (!queue.empty ()) { - lock.unlock (); + // TODO: Cleaner periodical logging + if (should_log ()) + { + node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", + queue.size (), + queue.size ({ nano::block_source::forced })); + } auto processed = process_batch (lock); debug_assert (!lock.owns_lock ()); @@ -291,40 +297,47 @@ auto nano::block_processor::next () -> context release_assert (false, "next() called when no blocks are ready"); } -auto nano::block_processor::process_batch (nano::unique_lock & lock_a) -> processed_batch_t +auto nano::block_processor::next_batch (size_t max_count) -> std::deque { - processed_batch_t processed; + debug_assert (!mutex.try_lock ()); + debug_assert (!queue.empty ()); - auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }, nano::store::writer::blockprocessor); - nano::timer timer_l; + queue.periodic_update (); - lock_a.lock (); + std::deque results; + while (!queue.empty () && results.size () < max_count) + { + results.push_back (next ()); + } + return results; +} - queue.periodic_update (); +auto nano::block_processor::process_batch (nano::unique_lock & lock) -> processed_batch_t +{ + debug_assert (lock.owns_lock ()); + debug_assert (!mutex.try_lock ()); + debug_assert (!queue.empty ()); + + auto batch = next_batch (256); + + lock.unlock (); - timer_l.start (); + auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }, nano::store::writer::blockprocessor); + + nano::timer timer; + timer.start (); // Processing blocks - unsigned number_of_blocks_processed (0), number_of_forced_processed (0); - auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); }; - auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; }; - auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; }; + size_t number_of_blocks_processed = 0; + size_t number_of_forced_processed = 0; - while (!queue.empty () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ()) + processed_batch_t processed; + for (auto & ctx : batch) { - // TODO: Cleaner periodical logging - if (should_log ()) - { - node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", - queue.size (), - queue.size ({ nano::block_source::forced })); - } - - auto ctx = next (); auto const hash = ctx.block->hash (); bool const force = ctx.source == nano::block_source::forced; - lock_a.unlock (); + transaction.refresh_if_needed (); if (force) { @@ -336,15 +349,11 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock auto result = process_one (transaction, ctx, force); processed.emplace_back (result, std::move (ctx)); - - lock_a.lock (); } - lock_a.unlock (); - - if (number_of_blocks_processed != 0 && timer_l.stop () > std::chrono::milliseconds (100)) + if (number_of_blocks_processed != 0 && timer.stop () > std::chrono::milliseconds (100)) { - node.logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer_l.value ().count (), timer_l.unit ()); + node.logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer.value ().count (), timer.unit ()); } return processed; diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 7311cf6dfc..3ad1caf4fb 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -93,13 +93,15 @@ class block_processor final std::size_t size () const; std::size_t size (block_source) const; - bool full () const; - bool half_full () const; bool add (std::shared_ptr const &, block_source = block_source::live, std::shared_ptr const & channel = nullptr); std::optional add_blocking (std::shared_ptr const & block, block_source); void force (std::shared_ptr const &); bool should_log (); + // TODO: Remove, used by legacy bootstrap + bool full () const; + bool half_full () const; + std::unique_ptr collect_container_info (std::string const & name); std::atomic flushing{ false }; @@ -120,6 +122,7 @@ class block_processor final nano::block_status process_one (secure::write_transaction const &, context const &, bool forced = false); void queue_unchecked (secure::write_transaction const &, nano::hash_or_account const &); processed_batch_t process_batch (nano::unique_lock &); + std::deque next_batch (size_t max_count); context next (); bool add_impl (context, std::shared_ptr const & channel = nullptr);