Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 4, 2024
1 parent a1f3d7e commit ab42634
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 57 deletions.
1 change: 1 addition & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ add_library(
election_insertion_result.hpp
epoch_upgrader.hpp
epoch_upgrader.cpp
fair_queue.hpp
inactive_cache_information.hpp
inactive_cache_information.cpp
inactive_cache_status.hpp
Expand Down
79 changes: 33 additions & 46 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas
block_processed.notify (result, context);
}
});

processing_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::block_processing);
this->process_blocks ();
Expand All @@ -61,18 +62,25 @@ void nano::block_processor::stop ()
nano::join_or_pass (processing_thread);
}

std::size_t nano::block_processor::size ()
// TODO: Remove and replace all checks with calls to size (block_source)
std::size_t nano::block_processor::size () const
{
nano::unique_lock<nano::mutex> lock{ mutex };
return blocks.size () + forced.size ();
return queue.total_size ();
}

bool nano::block_processor::full ()
std::size_t nano::block_processor::size (nano::block_source source) const
{
nano::unique_lock<nano::mutex> lock{ mutex };
return queue.size (source);
}

bool nano::block_processor::full () const
{
return size () >= node.flags.block_processor_full_size;
}

bool nano::block_processor::half_full ()
bool nano::block_processor::half_full () const
{
return size () >= node.flags.block_processor_full_size / 2;
}
Expand Down Expand Up @@ -130,7 +138,8 @@ void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)

{
nano::lock_guard<nano::mutex> lock{ mutex };
forced.emplace_back (context{ block_a, block_source::forced });
// forced.emplace_back (context{ block_a, block_source::forced });
queue.push (context{ block_a, block_source::forced }, block_source::forced);
}
condition.notify_all ();
}
Expand Down Expand Up @@ -176,7 +185,7 @@ void nano::block_processor::process_blocks ()
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
if (have_blocks_ready ())
if (!queue.empty ())
{
active = true;
lock.unlock ();
Expand Down Expand Up @@ -215,47 +224,26 @@ bool nano::block_processor::should_log ()
return result;
}

bool nano::block_processor::have_blocks_ready ()
{
debug_assert (!mutex.try_lock ());
return !blocks.empty () || !forced.empty ();
}

bool nano::block_processor::have_blocks ()
{
debug_assert (!mutex.try_lock ());
return have_blocks_ready ();
}

void nano::block_processor::add_impl (context ctx)
{
release_assert (ctx.source != nano::block_source::forced);
{
nano::lock_guard<nano::mutex> guard{ mutex };
blocks.emplace_back (std::move (ctx));
// blocks.emplace_back (std::move (ctx));
}
condition.notify_all ();
}

auto nano::block_processor::next () -> context
{
debug_assert (!mutex.try_lock ());
debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next

if (!forced.empty ())
{
auto entry = std::move (forced.front ());
release_assert (entry.source == nano::block_source::forced);
forced.pop_front ();
return entry;
}
debug_assert (!queue.empty ()); // This should be checked before calling next

if (!blocks.empty ())
if (!queue.empty ())
{
auto entry = std::move (blocks.front ());
release_assert (entry.source != nano::block_source::forced);
blocks.pop_front ();
return entry;
auto [request, origin] = queue.next ();
release_assert (origin.source != nano::block_source::forced || request.source == nano::block_source::forced);
return std::move (request);
}

release_assert (false, "next() called when no blocks are ready");
Expand All @@ -272,18 +260,19 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
lock_a.lock ();

timer_l.start ();

// Processing blocks
unsigned number_of_blocks_processed (0), number_of_forced_processed (0);
auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); };
auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; };
auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; };

while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ())
while (!queue.empty () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ())
{
// TODO: Cleaner periodical logging
if ((blocks.size () + forced.size () > 64) && should_log ())
if (should_log ())
{
node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ());
node.logger.debug (nano::log::type::blockprocessor, "{} blocks in processing queue", queue.total_size ());
}

auto ctx = next ();
Expand Down Expand Up @@ -418,18 +407,16 @@ void nano::block_processor::queue_unchecked (store::write_transaction const & tr

std::unique_ptr<nano::container_info_component> nano::block_processor::collect_container_info (std::string const & name)
{
std::size_t blocks_count;
std::size_t forced_count;

{
nano::lock_guard<nano::mutex> guard{ mutex };
blocks_count = blocks.size ();
forced_count = forced.size ();
}
// std::size_t blocks_count;
// {
// nano::lock_guard<nano::mutex> guard{ mutex };
// blocks_count = queue.total_size ();
// }

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", forced_count, sizeof (decltype (forced)::value_type) }));
composite->add_component (queue.collect_container_info ("queue"));
// composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (blocks)::value_type) }));
// composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", forced_count, sizeof (decltype (forced)::value_type) }));
return composite;
}

Expand Down
16 changes: 8 additions & 8 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <nano/lib/blocks.hpp>
#include <nano/lib/logging.hpp>
#include <nano/node/fair_queue.hpp>
#include <nano/secure/common.hpp>

#include <chrono>
Expand Down Expand Up @@ -65,15 +66,14 @@ class block_processor final
block_processor (nano::node &, nano::write_database_queue &);

void stop ();
std::size_t size ();
bool full ();
bool half_full ();
std::size_t size () const;
std::size_t size (block_source) const;
bool full () const;
bool half_full () const;
void add (std::shared_ptr<nano::block> const &, block_source = block_source::live);
std::optional<nano::block_status> add_blocking (std::shared_ptr<nano::block> const & block, block_source);
void force (std::shared_ptr<nano::block> const &);
bool should_log ();
bool have_blocks_ready ();
bool have_blocks ();
void process_blocks ();
std::unique_ptr<container_info_component> collect_container_info (std::string const & name);

Expand Down Expand Up @@ -105,12 +105,12 @@ class block_processor final
bool stopped{ false };
bool active{ false };

std::deque<context> blocks;
std::deque<context> forced;
nano::fair_queue<block_source, context> queue;

std::chrono::steady_clock::time_point next_log;

nano::condition_variable condition;
nano::mutex mutex{ mutex_identifier (mutexes::block_processor) };
mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) };
std::thread processing_thread;
};
}
4 changes: 2 additions & 2 deletions nano/node/bootstrap/bootstrap_legacy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ void nano::bootstrap_attempt_legacy::run ()

// TODO: This check / wait is a heuristic and should be improved.
auto wait_start = std::chrono::steady_clock::now ();
while (!stopped && node->block_processor.size () != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 }))
while (!stopped && node->block_processor.size (nano::block_source::bootstrap_legacy) != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 }))
{
condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size () == 0; });
condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size (nano::block_source::bootstrap_legacy) == 0; });
}

if (start_account.number () != std::numeric_limits<nano::uint256_t>::max ())
Expand Down
2 changes: 1 addition & 1 deletion nano/node/bootstrap_ascending/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ void nano::bootstrap_ascending::service::inspect (store::transaction const & tx,
void nano::bootstrap_ascending::service::wait_blockprocessor ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped && block_processor.size () > config.bootstrap_ascending.block_wait_count)
while (!stopped && block_processor.size (nano::block_source::bootstrap) > config.bootstrap_ascending.block_wait_count)
{
condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; }); // Blockprocessor is relatively slow, sleeping here instead of using conditions
}
Expand Down
153 changes: 153 additions & 0 deletions nano/node/fair_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#pragma once

#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/thread_roles.hpp>
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/bandwidth_limiter.hpp>
#include <nano/node/transport/channel.hpp>

#include <boost/circular_buffer.hpp>

#include <algorithm>
#include <condition_variable>
#include <deque>
#include <functional>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>

namespace nano
{
template <typename Source, typename Request>
class fair_queue final
{
private:
struct entry
{
// using queue_t = boost::circular_buffer<Request>;
using queue_t = std::deque<Request>;

queue_t requests;
nano::bandwidth_limiter limiter;
size_t const priority;
size_t const max_size;

entry (size_t max_size, size_t priority, size_t max_rate, double max_burst_ratio) :
// requests{ max_size },
limiter{ max_rate, max_burst_ratio },
priority{ priority },
max_size{ max_size }
{
}
};

public:
// using source_type = std::pair<Source, std::shared_ptr<nano::transport::channel>>;
struct source_type
{
Source source;
std::shared_ptr<nano::transport::channel> channel;

auto operator<=> (source_type const &) const = default;
};

using value_type = std::pair<Request, source_type>;

public:
explicit fair_queue () = default;

size_t size (Source source, std::shared_ptr<nano::transport::channel> channel = nullptr) const
{
auto it = queues.find (source_type{ source, channel });
return it == queues.end () ? 0 : it->second.requests.size ();
}

size_t total_size () const
{
return std::accumulate (queues.begin (), queues.end (), 0, [] (size_t total, auto const & queue) {
return total + queue.second.requests.size ();
});
};

bool empty () const
{
return std::any_of (queues.begin (), queues.end (), [] (auto const & queue) {
return !queue.second.requests.empty ();
});
}

size_t queues_size () const
{
return queues.size ();
}

void clear ()
{
queues.clear ();
}

/// Should be called periodically to clean up stale channels
void cleanup ()
{
erase_if (queues, [] (auto const & entry) {
return !entry.first.second.alive ();
});
}

void push (Request request, Source source, std::shared_ptr<nano::transport::channel> channel = nullptr)
{
auto const source_key = source_type{ source, channel };

auto it = queues.find (source_key);

// Create a new queue if it doesn't exist
if (it == queues.end ())
{
auto max_size = max_size_query (source_key);
auto priority = priority_query (source_key);
auto [max_rate, max_burst_ratio] = rate_query (source_key);

entry new_entry{ max_size, priority, max_rate, max_burst_ratio };

// it = queues.emplace (std::make_pair (source_key, entry{ max_size, priority, max_rate, max_burst_ratio })).first;
}

auto & queue = it->second;

// queue.requests.push_back (std::move (request));

// queue.requests.push_back (std::move (request));
}

public:
using query_size_t = std::function<size_t (source_type const &)>;
using query_priority_t = std::function<size_t (source_type const &)>;
using query_rate_t = std::function<std::pair<size_t, double> (source_type const &)>;

query_size_t max_size_query{ [] (auto const & origin) { debug_assert (false, "max_size_query callback empty"); return 0; } };
query_priority_t priority_query{ [] (auto const & origin) { debug_assert (false, "priority_query callback empty"); return 0; } };
query_rate_t rate_query{ [] (auto const & origin) { debug_assert (false, "rate_query callback empty"); return std::pair<size_t, double>{ 0, 1.0 }; } };

public:
value_type next ()
{
}

std::deque<value_type> next_batch (size_t max_count);

private:
std::map<source_type, entry> queues;

public:
std::unique_ptr<container_info_component> collect_container_info (std::string const & name)
{
auto composite = std::make_unique<container_info_composite> (name);
// composite->add_component (std::make_unique<container_info_leaf> (container_info{ "queue", queue.size (), sizeof (typename decltype (queue)::value_type) }));
return composite;
}
};
}

0 comments on commit ab42634

Please sign in to comment.