Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 4, 2024
1 parent fc59674 commit 9d2f6ee
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 42 deletions.
1 change: 1 addition & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ enum class detail : uint8_t
none,
success,
unknown,
queue_overflow,

// processing queue
queue,
Expand Down
71 changes: 54 additions & 17 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,45 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas
block_processed.notify (result, context);
}
});

processing_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::block_processing);
this->process_blocks ();
});

queue.max_size_query = [this] (auto const & origin) {
switch (origin.source)
{
case nano::block_source::live:
return 128;
default:
return 1024 * 16;
}
};

queue.priority_query = [this] (auto const & origin) {
switch (origin.source)
{
case nano::block_source::live:
return 1;
case nano::block_source::local:
return 16;
case nano::block_source::bootstrap:
return 8;
default:
return 1;
}
};

queue.rate_limit_query = [this] (auto const & origin) -> std::pair<size_t, double> {
switch (origin.source)
{
case nano::block_source::live:
return { 100, 3.0 };
default:
return { 0, 1.0 }; // Unlimited
}
};
}

void nano::block_processor::stop ()
Expand Down Expand Up @@ -85,7 +119,7 @@ bool nano::block_processor::half_full () const
return size () >= node.flags.block_processor_full_size / 2;
}

void nano::block_processor::add (std::shared_ptr<nano::block> const & block, block_source const source)
void nano::block_processor::add (std::shared_ptr<nano::block> const & block, block_source const source, std::shared_ptr<nano::transport::channel> channel)
{
if (full ())
{
Expand All @@ -99,9 +133,12 @@ void nano::block_processor::add (std::shared_ptr<nano::block> const & block, blo
}

node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process);
node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source));
node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})",
block->hash ().to_string (),
to_string (source),
channel ? channel->to_string () : "<unknown>"); // TODO: Lazy eval

add_impl (context{ block, source });
add_impl (context{ block, source }, channel);
}

std::optional<nano::block_status> nano::block_processor::add_blocking (std::shared_ptr<nano::block> const & block, block_source const source)
Expand Down Expand Up @@ -136,10 +173,18 @@ void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force);
node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ());

add_impl (context{ block_a, block_source::forced });
}

void nano::block_processor::add_impl (context ctx, std::shared_ptr<nano::transport::channel> channel)
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
// forced.emplace_back (context{ block_a, block_source::forced });
queue.push (context{ block_a, block_source::forced }, block_source::forced);
nano::lock_guard<nano::mutex> guard{ mutex };
bool overflow = queue.push (std::move (ctx), ctx.source, channel);
if (overflow)
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::queue_overflow);
}
}
condition.notify_all ();
}
Expand Down Expand Up @@ -224,16 +269,6 @@ bool nano::block_processor::should_log ()
return result;
}

void nano::block_processor::add_impl (context ctx)
{
release_assert (ctx.source != nano::block_source::forced);
{
nano::lock_guard<nano::mutex> guard{ mutex };
// blocks.emplace_back (std::move (ctx));
}
condition.notify_all ();
}

auto nano::block_processor::next () -> context
{
debug_assert (!mutex.try_lock ());
Expand All @@ -259,6 +294,8 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock

lock_a.lock ();

queue.periodic_cleanup ();

timer_l.start ();

// Processing blocks
Expand Down
4 changes: 2 additions & 2 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class block_processor final
std::size_t size (block_source) const;
bool full () const;
bool half_full () const;
void add (std::shared_ptr<nano::block> const &, block_source = block_source::live);
void add (std::shared_ptr<nano::block> const &, block_source = block_source::live, std::shared_ptr<nano::transport::channel> channel = nullptr);
std::optional<nano::block_status> add_blocking (std::shared_ptr<nano::block> const & block, block_source);
void force (std::shared_ptr<nano::block> const &);
bool should_log ();
Expand All @@ -95,7 +95,7 @@ class block_processor final
void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &);
processed_batch_t process_batch (nano::unique_lock<nano::mutex> &);
context next ();
void add_impl (context);
void add_impl (context, std::shared_ptr<nano::transport::channel> channel = nullptr);

private: // Dependencies
nano::node & node;
Expand Down
145 changes: 122 additions & 23 deletions nano/node/fair_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <boost/circular_buffer.hpp>

#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <deque>
#include <functional>
Expand All @@ -28,21 +29,44 @@ class fair_queue final
private:
struct entry
{
// using queue_t = boost::circular_buffer<Request>;
using queue_t = std::deque<Request>;

queue_t requests;
nano::bandwidth_limiter limiter;
nano::bandwidth_limiter_st limiter;
size_t const priority;
size_t const max_size;

entry (size_t max_size, size_t priority, size_t max_rate, double max_burst_ratio) :
// requests{ max_size },
limiter{ max_rate, max_burst_ratio },
priority{ priority },
max_size{ max_size }
{
}

Request pop ()
{
release_assert (!requests.empty ());

auto request = std::move (requests.front ());
requests.pop_front ();
return request;
}

bool push (Request request)
{
requests.push_back (std::move (request));
if (requests.size () > max_size)
{
requests.pop_front ();
return true; // Overflow
}
return false; // No overflow
}

bool empty () const
{
return requests.empty ();
}
};

public:
Expand All @@ -58,8 +82,6 @@ class fair_queue final
using value_type = std::pair<Request, source_type>;

public:
explicit fair_queue () = default;

size_t size (Source source, std::shared_ptr<nano::transport::channel> channel = nullptr) const
{
auto it = queues.find (source_type{ source, channel });
Expand All @@ -75,8 +97,8 @@ class fair_queue final

bool empty () const
{
return std::any_of (queues.begin (), queues.end (), [] (auto const & queue) {
return !queue.second.requests.empty ();
return std::all_of (queues.begin (), queues.end (), [] (auto const & queue) {
return queue.second.requests.empty ();
});
}

Expand All @@ -91,14 +113,18 @@ class fair_queue final
}

/// Should be called periodically to clean up stale channels
void cleanup ()
void periodic_cleanup ()
{
erase_if (queues, [] (auto const & entry) {
return !entry.first.second.alive ();
});
std::chrono::seconds const cleanup_interval{ 30 };

if (elapsed (last_cleanup, cleanup_interval))
{
last_cleanup = std::chrono::steady_clock::now ();
cleanup ();
}
}

void push (Request request, Source source, std::shared_ptr<nano::transport::channel> channel = nullptr)
bool push (Request request, Source source, std::shared_ptr<nano::transport::channel> channel = nullptr)
{
auto const source_key = source_type{ source, channel };

Expand All @@ -109,18 +135,18 @@ class fair_queue final
{
auto max_size = max_size_query (source_key);
auto priority = priority_query (source_key);
auto [max_rate, max_burst_ratio] = rate_query (source_key);
auto [max_rate, max_burst_ratio] = rate_limit_query (source_key);

entry new_entry{ max_size, priority, max_rate, max_burst_ratio };
debug_assert (max_size > 0);
debug_assert (priority > 0);

// it = queues.emplace (std::make_pair (source_key, entry{ max_size, priority, max_rate, max_burst_ratio })).first;
entry new_entry{ max_size, priority, max_rate, max_burst_ratio };
it = queues.emplace (source_type{ source, channel }, std::move (new_entry)).first;
}
release_assert (it != queues.end ());

auto & queue = it->second;

// queue.requests.push_back (std::move (request));

// queue.requests.push_back (std::move (request));
return queue.push (std::move (request));
}

public:
Expand All @@ -130,24 +156,97 @@ class fair_queue final

query_size_t max_size_query{ [] (auto const & origin) { debug_assert (false, "max_size_query callback empty"); return 0; } };
query_priority_t priority_query{ [] (auto const & origin) { debug_assert (false, "priority_query callback empty"); return 0; } };
query_rate_t rate_query{ [] (auto const & origin) { debug_assert (false, "rate_query callback empty"); return std::pair<size_t, double>{ 0, 1.0 }; } };
query_rate_t rate_limit_query{ [] (auto const & origin) { debug_assert (false, "rate_query callback empty"); return std::pair<size_t, double>{ 0, 1.0 }; } };

public:
value_type next ()
{
debug_assert (!empty ()); // Should be checked before calling next

auto should_seek = [&, this] () {
if (current_queue == queues.end ())
{
return true;
}
auto & queue = current_queue->second;
if (queue.empty ())
{
return true;
}
// Allow up to `queue.priority` requests to be processed before moving to the next queue
if (current_queue_counter >= queue.priority)
{
return true;
}
return false;
};

if (should_seek ())
{
seek_next ();
}

release_assert (current_queue != queues.end ());

auto & source = current_queue->first;
auto & queue = current_queue->second;

++current_queue_counter;
auto request = queue.pop ();
return { std::move (request), source };
}

std::deque<value_type> next_batch (size_t max_count);
std::deque<value_type> next_batch (size_t max_count)
{
// TODO: Naive implementation, could be optimized
std::deque<value_type> result;
while (!empty () && result.size () < max_count)
{
result.emplace_back (next ());
}
return result;
}

private:
void seek_next ()
{
do
{
if (current_queue != queues.end ())
{
++current_queue;
}
if (current_queue == queues.end ())
{
current_queue = queues.begin ();
}
release_assert (current_queue != queues.end ());
} while (current_queue->second.empty ());
}

void cleanup ()
{
current_queue = queues.end (); // Invalidate current iterator

erase_if (queues, [] (auto const & entry) {
return !entry.first.channel->alive ();
});
}

private:
std::map<source_type, entry> queues;
decltype (queues)::iterator current_queue{ queues.end () };
size_t current_queue_counter{ 0 };

std::chrono::steady_clock::time_point last_cleanup{};

public:
std::unique_ptr<container_info_component> collect_container_info (std::string const & name)
{
auto composite = std::make_unique<container_info_composite> (name);
// composite->add_component (std::make_unique<container_info_leaf> (container_info{ "queue", queue.size (), sizeof (typename decltype (queue)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "queues", queues.size (), sizeof (typename decltype (queues)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "total_size", total_size (), sizeof (typename decltype (queues)::value_type) }));
return composite;
}
};
}
}
1 change: 1 addition & 0 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ class network_message_visitor : public nano::message_visitor
{
if (!node.block_processor.full ())
{
// TODO: Pass channel source
node.process_active (message_a.block);
}
else
Expand Down

0 comments on commit 9d2f6ee

Please sign in to comment.