Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 5, 2024
1 parent 3e35996 commit 0ddc5db
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 87 deletions.
2 changes: 2 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ enum class type : uint8_t
blockprocessor,
blockprocessor_source,
blockprocessor_result,
blockprocessor_overfill,
bootstrap_server,
active,
active_started,
Expand Down Expand Up @@ -75,6 +76,7 @@ enum class detail : uint8_t
none,
success,
unknown,
queue_overflow,

// processing queue
queue,
Expand Down
1 change: 1 addition & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ add_library(
election_insertion_result.hpp
epoch_upgrader.hpp
epoch_upgrader.cpp
fair_queue.hpp
inactive_cache_information.hpp
inactive_cache_information.cpp
inactive_cache_status.hpp
Expand Down
142 changes: 75 additions & 67 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,32 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas
block_processed.notify (result, context);
}
});

// TODO: Make these configurable
queue.max_size_query = [this] (auto const & origin) {
switch (origin.source)
{
case nano::block_source::live:
return 128;
default:
return 1024 * 16;
}
};

// TODO: Make these configurable
queue.priority_query = [this] (auto const & origin) {
switch (origin.source)
{
case nano::block_source::live:
return 1;
case nano::block_source::local:
return 16;
case nano::block_source::bootstrap:
return 8;
default:
return 1;
}
};
}

nano::block_processor::~block_processor ()
Expand Down Expand Up @@ -76,39 +102,44 @@ void nano::block_processor::stop ()
}
}

std::size_t nano::block_processor::size ()
// TODO: Remove and replace all checks with calls to size (block_source)
std::size_t nano::block_processor::size () const
{
nano::unique_lock<nano::mutex> lock{ mutex };
return blocks.size () + forced.size ();
return queue.total_size ();
}

bool nano::block_processor::full ()
std::size_t nano::block_processor::size (nano::block_source source) const
{
nano::unique_lock<nano::mutex> lock{ mutex };
return queue.size (source);
}

bool nano::block_processor::full () const
{
return size () >= node.flags.block_processor_full_size;
}

bool nano::block_processor::half_full ()
bool nano::block_processor::half_full () const
{
return size () >= node.flags.block_processor_full_size / 2;
}

void nano::block_processor::add (std::shared_ptr<nano::block> const & block, block_source const source)
bool nano::block_processor::add (std::shared_ptr<nano::block> const & block, block_source const source, std::shared_ptr<nano::transport::channel> channel)
{
if (full ())
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill);
return;
}
if (node.network_params.work.validate_entry (*block)) // true => error
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work);
return;
return false; // Not added
}

node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process);
node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source));
node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})",
block->hash ().to_string (),
to_string (source),
channel ? channel->to_string () : "<unknown>"); // TODO: Lazy eval

add_impl (context{ block, source });
return add_impl (context{ block, source }, channel);
}

std::optional<nano::block_status> nano::block_processor::add_blocking (std::shared_ptr<nano::block> const & block, block_source const source)
Expand Down Expand Up @@ -143,11 +174,26 @@ void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force);
node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ());

add_impl (context{ block_a, block_source::forced });
}

bool nano::block_processor::add_impl (context ctx, std::shared_ptr<nano::transport::channel> channel)
{
bool added = false;
{
nano::lock_guard<nano::mutex> lock{ mutex };
forced.emplace_back (context{ block_a, block_source::forced });
nano::lock_guard<nano::mutex> guard{ mutex };
added = queue.push (std::move (ctx), ctx.source, channel);
}
condition.notify_all ();
if (added)
{
condition.notify_all ();
}
else
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill);
node.stats.inc (nano::stat::type::blockprocessor_overfill, to_stat_detail (ctx.source));
}
return added;
}

void nano::block_processor::rollback_competitor (store::write_transaction const & transaction, nano::block const & block)
Expand Down Expand Up @@ -191,7 +237,7 @@ void nano::block_processor::run ()
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
if (have_blocks_ready ())
if (!queue.empty ())
{
active = true;
lock.unlock ();
Expand Down Expand Up @@ -230,47 +276,16 @@ bool nano::block_processor::should_log ()
return result;
}

bool nano::block_processor::have_blocks_ready ()
{
debug_assert (!mutex.try_lock ());
return !blocks.empty () || !forced.empty ();
}

bool nano::block_processor::have_blocks ()
{
debug_assert (!mutex.try_lock ());
return have_blocks_ready ();
}

void nano::block_processor::add_impl (context ctx)
{
release_assert (ctx.source != nano::block_source::forced);
{
nano::lock_guard<nano::mutex> guard{ mutex };
blocks.emplace_back (std::move (ctx));
}
condition.notify_all ();
}

auto nano::block_processor::next () -> context
{
debug_assert (!mutex.try_lock ());
debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next

if (!forced.empty ())
{
auto entry = std::move (forced.front ());
release_assert (entry.source == nano::block_source::forced);
forced.pop_front ();
return entry;
}
debug_assert (!queue.empty ()); // This should be checked before calling next

if (!blocks.empty ())
if (!queue.empty ())
{
auto entry = std::move (blocks.front ());
release_assert (entry.source != nano::block_source::forced);
blocks.pop_front ();
return entry;
auto [request, origin] = queue.next ();
release_assert (origin.source != nano::block_source::forced || request.source == nano::block_source::forced);
return std::move (request);
}

release_assert (false, "next() called when no blocks are ready");
Expand All @@ -286,19 +301,22 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock

lock_a.lock ();

queue.periodic_cleanup ();

timer_l.start ();

// Processing blocks
unsigned number_of_blocks_processed (0), number_of_forced_processed (0);
auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); };
auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; };
auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; };

while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ())
while (!queue.empty () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ())
{
// TODO: Cleaner periodical logging
if ((blocks.size () + forced.size () > 64) && should_log ())
if (should_log ())
{
node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ());
node.logger.debug (nano::log::type::blockprocessor, "{} blocks in processing queue", queue.total_size ());
}

auto ctx = next ();
Expand Down Expand Up @@ -433,18 +451,8 @@ void nano::block_processor::queue_unchecked (store::write_transaction const & tr

std::unique_ptr<nano::container_info_component> nano::block_processor::collect_container_info (std::string const & name)
{
std::size_t blocks_count;
std::size_t forced_count;

{
nano::lock_guard<nano::mutex> guard{ mutex };
blocks_count = blocks.size ();
forced_count = forced.size ();
}

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", forced_count, sizeof (decltype (forced)::value_type) }));
composite->add_component (queue.collect_container_info ("queue"));
return composite;
}

Expand Down
20 changes: 10 additions & 10 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <nano/lib/blocks.hpp>
#include <nano/lib/logging.hpp>
#include <nano/node/fair_queue.hpp>
#include <nano/secure/common.hpp>

#include <chrono>
Expand Down Expand Up @@ -68,15 +69,14 @@ class block_processor final
void start ();
void stop ();

std::size_t size ();
bool full ();
bool half_full ();
void add (std::shared_ptr<nano::block> const &, block_source = block_source::live);
std::size_t size () const;
std::size_t size (block_source) const;
bool full () const;
bool half_full () const;
bool add (std::shared_ptr<nano::block> const &, block_source = block_source::live, std::shared_ptr<nano::transport::channel> channel = nullptr);
std::optional<nano::block_status> add_blocking (std::shared_ptr<nano::block> const & block, block_source);
void force (std::shared_ptr<nano::block> const &);
bool should_log ();
bool have_blocks_ready ();
bool have_blocks ();

std::unique_ptr<container_info_component> collect_container_info (std::string const & name);

Expand All @@ -99,7 +99,7 @@ class block_processor final
void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &);
processed_batch_t process_batch (nano::unique_lock<nano::mutex> &);
context next ();
void add_impl (context);
bool add_impl (context, std::shared_ptr<nano::transport::channel> channel = nullptr);

private: // Dependencies
nano::node & node;
Expand All @@ -109,12 +109,12 @@ class block_processor final
bool stopped{ false };
bool active{ false };

std::deque<context> blocks;
std::deque<context> forced;
nano::fair_queue<block_source, context> queue;

std::chrono::steady_clock::time_point next_log;

nano::condition_variable condition;
nano::mutex mutex{ mutex_identifier (mutexes::block_processor) };
mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) };
std::thread thread;
};
}
4 changes: 2 additions & 2 deletions nano/node/bootstrap/bootstrap_legacy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ void nano::bootstrap_attempt_legacy::run ()

// TODO: This check / wait is a heuristic and should be improved.
auto wait_start = std::chrono::steady_clock::now ();
while (!stopped && node->block_processor.size () != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 }))
while (!stopped && node->block_processor.size (nano::block_source::bootstrap_legacy) != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 }))
{
condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size () == 0; });
condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size (nano::block_source::bootstrap_legacy) == 0; });
}

if (start_account.number () != std::numeric_limits<nano::uint256_t>::max ())
Expand Down
2 changes: 1 addition & 1 deletion nano/node/bootstrap_ascending/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ void nano::bootstrap_ascending::service::inspect (store::transaction const & tx,
void nano::bootstrap_ascending::service::wait_blockprocessor ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped && block_processor.size () > config.bootstrap_ascending.block_wait_count)
while (!stopped && block_processor.size (nano::block_source::bootstrap) > config.bootstrap_ascending.block_wait_count)
{
condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; }); // Blockprocessor is relatively slow, sleeping here instead of using conditions
}
Expand Down
Loading

0 comments on commit 0ddc5db

Please sign in to comment.