Skip to content

Commit

Permalink
Use per peer fair queue in block processor
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 10, 2024
1 parent 619258d commit 6602c7c
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 97 deletions.
7 changes: 3 additions & 4 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -813,10 +813,9 @@ TEST (network, duplicate_detection)
TEST (network, duplicate_revert_publish)
{
nano::test::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 0;
auto & node (*system.add_node (node_flags));
ASSERT_TRUE (node.block_processor.full ());
nano::node_config node_config = system.default_config ();
node_config.block_processor.max_peer_queue = 0;
auto & node (*system.add_node (node_config));
nano::publish publish{ nano::dev::network_params.network, nano::dev::genesis };
std::vector<uint8_t> bytes;
{
Expand Down
5 changes: 3 additions & 2 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ TEST (node, expire)
ASSERT_TRUE (node0.expired ());
}

// This test is racy, there is no guarantee that the election won't be confirmed until all forks are fully processed
TEST (node, fork_publish)
{
nano::test::system system (1);
Expand Down Expand Up @@ -674,6 +675,7 @@ TEST (node, fork_keep)
ASSERT_TRUE (node2.ledger.block_exists (transaction1, send1->hash ()));
}

// This test is racy, there is no guarantee that the election won't be confirmed until all forks are fully processed
TEST (node, fork_flip)
{
nano::test::system system (2);
Expand All @@ -699,8 +701,7 @@ TEST (node, fork_flip)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build ();
nano::publish publish2{ nano::dev::network_params.network, send2 };
auto ignored_channel{ std::make_shared<nano::transport::channel_tcp> (node1, std::weak_ptr<nano::transport::socket> ()) };

auto ignored_channel = nano::test::fake_channel (node1);
node1.network.inbound (publish1, ignored_channel);
node2.network.inbound (publish2, ignored_channel);
ASSERT_TIMELY_EQ (5s, 1, node1.active.size ());
Expand Down
2 changes: 2 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ enum class type : uint8_t
blockprocessor,
blockprocessor_source,
blockprocessor_result,
blockprocessor_overfill,
bootstrap_server,
active,
active_started,
Expand Down Expand Up @@ -80,6 +81,7 @@ enum class detail : uint8_t
none,
success,
unknown,
queue_overflow,

// processing queue
queue,
Expand Down
188 changes: 120 additions & 68 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <nano/secure/ledger.hpp>
#include <nano/store/component.hpp>

#include <boost/format.hpp>
#include <utility>

#include <magic_enum.hpp>

Expand All @@ -15,7 +15,7 @@
*/

nano::block_processor::context::context (std::shared_ptr<nano::block> block, nano::block_source source_a) :
block{ block },
block{ std::move (block) },
source{ source_a }
{
debug_assert (source != nano::block_source::unknown);
Expand All @@ -36,6 +36,7 @@ void nano::block_processor::context::set_result (result_t const & result)
*/

nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) :
config{ node_a.config.block_processor },
node (node_a),
write_database_queue (write_database_queue_a),
next_log (std::chrono::steady_clock::now ())
Expand All @@ -47,6 +48,32 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas
block_processed.notify (result, context);
}
});

queue.max_size_query = [this] (auto const & origin) {
switch (std::get<nano::block_source> (origin.sources))
{
case nano::block_source::live:
return config.max_peer_queue;
default:
return config.max_system_queue;
}
};

queue.priority_query = [this] (auto const & origin) -> size_t {
switch (std::get<nano::block_source> (origin.sources))
{
case nano::block_source::live:
return config.priority_live;
case nano::block_source::bootstrap:
case nano::block_source::bootstrap_legacy:
case nano::block_source::unchecked:
return config.priority_bootstrap;
case nano::block_source::local:
return config.priority_local;
default:
return 1;
}
};
}

nano::block_processor::~block_processor ()
Expand Down Expand Up @@ -78,39 +105,44 @@ void nano::block_processor::stop ()
}
}

std::size_t nano::block_processor::size ()
// TODO: Remove and replace all checks with calls to size (block_source)
std::size_t nano::block_processor::size () const
{
nano::unique_lock<nano::mutex> lock{ mutex };
return queue.total_size ();
}

std::size_t nano::block_processor::size (nano::block_source source) const
{
nano::unique_lock<nano::mutex> lock{ mutex };
return blocks.size () + forced.size ();
return queue.size ({ source });
}

bool nano::block_processor::full ()
bool nano::block_processor::full () const
{
return size () >= node.flags.block_processor_full_size;
}

bool nano::block_processor::half_full ()
bool nano::block_processor::half_full () const
{
return size () >= node.flags.block_processor_full_size / 2;
}

void nano::block_processor::add (std::shared_ptr<nano::block> const & block, block_source const source)
bool nano::block_processor::add (std::shared_ptr<nano::block> const & block, block_source const source, std::shared_ptr<nano::transport::channel> const & channel)
{
if (full ())
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill);
return;
}
if (node.network_params.work.validate_entry (*block)) // true => error
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work);
return;
return false; // Not added
}

node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process);
node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source));
node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})",
block->hash ().to_string (),
to_string (source),
channel ? channel->to_string () : "<unknown>"); // TODO: Lazy eval

add_impl (context{ block, source });
return add_impl (context{ block, source }, channel);
}

std::optional<nano::block_status> nano::block_processor::add_blocking (std::shared_ptr<nano::block> const & block, block_source const source)
Expand Down Expand Up @@ -145,11 +177,27 @@ void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force);
node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ());

add_impl (context{ block_a, block_source::forced });
}

bool nano::block_processor::add_impl (context ctx, std::shared_ptr<nano::transport::channel> const & channel)
{
auto const source = ctx.source;
bool added = false;
{
nano::lock_guard<nano::mutex> lock{ mutex };
forced.emplace_back (context{ block_a, block_source::forced });
nano::lock_guard<nano::mutex> guard{ mutex };
added = queue.push (std::move (ctx), { source, channel });
}
condition.notify_all ();
if (added)
{
condition.notify_all ();
}
else
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill);
node.stats.inc (nano::stat::type::blockprocessor_overfill, to_stat_detail (source));
}
return added;
}

void nano::block_processor::rollback_competitor (store::write_transaction const & transaction, nano::block const & block)
Expand Down Expand Up @@ -193,7 +241,7 @@ void nano::block_processor::run ()
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
if (have_blocks_ready ())
if (!queue.empty ())
{
lock.unlock ();

Expand Down Expand Up @@ -230,47 +278,16 @@ bool nano::block_processor::should_log ()
return result;
}

bool nano::block_processor::have_blocks_ready ()
{
debug_assert (!mutex.try_lock ());
return !blocks.empty () || !forced.empty ();
}

bool nano::block_processor::have_blocks ()
{
debug_assert (!mutex.try_lock ());
return have_blocks_ready ();
}

void nano::block_processor::add_impl (context ctx)
{
release_assert (ctx.source != nano::block_source::forced);
{
nano::lock_guard<nano::mutex> guard{ mutex };
blocks.emplace_back (std::move (ctx));
}
condition.notify_all ();
}

auto nano::block_processor::next () -> context
{
debug_assert (!mutex.try_lock ());
debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next

if (!forced.empty ())
{
auto entry = std::move (forced.front ());
release_assert (entry.source == nano::block_source::forced);
forced.pop_front ();
return entry;
}
debug_assert (!queue.empty ()); // This should be checked before calling next

if (!blocks.empty ())
if (!queue.empty ())
{
auto entry = std::move (blocks.front ());
release_assert (entry.source != nano::block_source::forced);
blocks.pop_front ();
return entry;
auto [request, origin] = queue.next ();
release_assert (std::get<nano::block_source> (origin.sources) != nano::block_source::forced || request.source == nano::block_source::forced);
return std::move (request);
}

release_assert (false, "next() called when no blocks are ready");
Expand All @@ -286,19 +303,24 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock

lock_a.lock ();

queue.periodic_update ();

timer_l.start ();

// Processing blocks
unsigned number_of_blocks_processed (0), number_of_forced_processed (0);
auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); };
auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; };
auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; };

while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ())
while (!queue.empty () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ())
{
// TODO: Cleaner periodical logging
if ((blocks.size () + forced.size () > 64) && should_log ())
if (should_log ())
{
node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ());
node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
queue.total_size (),
queue.size ({ nano::block_source::forced }));
}

auto ctx = next ();
Expand Down Expand Up @@ -339,6 +361,7 @@ nano::block_status nano::block_processor::process_one (store::write_transaction

node.stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result));
node.stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source));

node.logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed,
nano::log::arg{ "result", result },
nano::log::arg{ "source", context.source },
Expand Down Expand Up @@ -434,18 +457,12 @@ void nano::block_processor::queue_unchecked (store::write_transaction const & tr

std::unique_ptr<nano::container_info_component> nano::block_processor::collect_container_info (std::string const & name)
{
std::size_t blocks_count;
std::size_t forced_count;

{
nano::lock_guard<nano::mutex> guard{ mutex };
blocks_count = blocks.size ();
forced_count = forced.size ();
}
nano::lock_guard<nano::mutex> guard{ mutex };

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", forced_count, sizeof (decltype (forced)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", queue.total_size (), 0 }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", queue.size ({ nano::block_source::forced }), 0 }));
composite->add_component (queue.collect_container_info ("queue"));
return composite;
}

Expand All @@ -460,3 +477,38 @@ nano::stat::detail nano::to_stat_detail (nano::block_source type)
debug_assert (value);
return value.value_or (nano::stat::detail{});
}

/*
* block_processor_config
*/

nano::block_processor_config::block_processor_config (const nano::network_constants & network_constants)
{
if (network_constants.is_beta_network ())
{
// Bump max queue sizes for beta network to allow for more aggressive block propagation for saturation testing
max_peer_queue = 1024;
}
}

nano::error nano::block_processor_config::serialize (nano::tomlconfig & toml) const
{
toml.put ("max_peer_queue", max_peer_queue, "Maximum number of blocks to queue from network peers. \ntype:uint64");
toml.put ("max_system_queue", max_system_queue, "Maximum number of blocks to queue from system components (local RPC, bootstrap). \ntype:uint64");
toml.put ("priority_live", priority_live, "Priority for live network blocks. Higher priority gets processed more frequently. \ntype:uint64");
toml.put ("priority_bootstrap", priority_bootstrap, "Priority for bootstrap blocks. Higher priority gets processed more frequently. \ntype:uint64");
toml.put ("priority_local", priority_local, "Priority for local RPC blocks. Higher priority gets processed more frequently. \ntype:uint64");

return toml.get_error ();
}

nano::error nano::block_processor_config::deserialize (nano::tomlconfig & toml)
{
toml.get ("max_peer_queue", max_peer_queue);
toml.get ("max_system_queue", max_system_queue);
toml.get ("priority_live", priority_live);
toml.get ("priority_bootstrap", priority_bootstrap);
toml.get ("priority_local", priority_local);

return toml.get_error ();
}
Loading

0 comments on commit 6602c7c

Please sign in to comment.