Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 6, 2024
1 parent 9d2f6ee commit 2beefa2
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 25 deletions.
4 changes: 2 additions & 2 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ std::size_t nano::block_processor::size () const
std::size_t nano::block_processor::size (nano::block_source source) const
{
nano::unique_lock<nano::mutex> lock{ mutex };
return queue.size (source);
return queue.size ({ source });
}

bool nano::block_processor::full () const
Expand Down Expand Up @@ -180,7 +180,7 @@ void nano::block_processor::add_impl (context ctx, std::shared_ptr<nano::transpo
{
{
nano::lock_guard<nano::mutex> guard{ mutex };
bool overflow = queue.push (std::move (ctx), ctx.source, channel);
bool overflow = queue.push (std::move (ctx), { ctx.source, channel });
if (overflow)
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::queue_overflow);
Expand Down
2 changes: 1 addition & 1 deletion nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class block_processor final
bool stopped{ false };
bool active{ false };

nano::fair_queue<block_source, context> queue;
nano::per_peer_fair_queue<block_source, context> queue;

std::chrono::steady_clock::time_point next_log;

Expand Down
83 changes: 61 additions & 22 deletions nano/node/fair_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,49 @@
#include <utility>
#include <vector>

namespace nano::fair_queue_sources
{
template <typename Type>
struct source_by_type
{
Type source;

// Keep implicit for better ergonomics
source_by_type (Type source) :
source{ source }
{
}

bool alive () const
{
return true;
}

auto operator<=> (source_by_type const &) const = default;
};

template <typename Type>
struct source_by_type_and_channel
{
Type source;
std::shared_ptr<nano::transport::channel> channel;

// Keep implicit for better ergonomics
source_by_type_and_channel (Type source, std::shared_ptr<nano::transport::channel> channel = nullptr) :
source{ source },
channel{ channel }
{
}

bool alive () const
{
return channel->alive ();
}

auto operator<=> (source_by_type_and_channel const &) const = default;
};
}

namespace nano
{
template <typename Source, typename Request>
Expand Down Expand Up @@ -70,21 +113,13 @@ class fair_queue final
};

public:
// using source_type = std::pair<Source, std::shared_ptr<nano::transport::channel>>;
struct source_type
{
Source source;
std::shared_ptr<nano::transport::channel> channel;

auto operator<=> (source_type const &) const = default;
};

using value_type = std::pair<Request, source_type>;
using source_type = Source;
using value_type = std::pair<Request, Source>;

public:
size_t size (Source source, std::shared_ptr<nano::transport::channel> channel = nullptr) const
size_t size (Source source) const
{
auto it = queues.find (source_type{ source, channel });
auto it = queues.find (source);
return it == queues.end () ? 0 : it->second.requests.size ();
}

Expand Down Expand Up @@ -112,7 +147,7 @@ class fair_queue final
queues.clear ();
}

/// Should be called periodically to clean up stale channels
/// Should be called periodically to clean up stale sources
void periodic_cleanup ()
{
std::chrono::seconds const cleanup_interval{ 30 };
Expand All @@ -124,24 +159,22 @@ class fair_queue final
}
}

bool push (Request request, Source source, std::shared_ptr<nano::transport::channel> channel = nullptr)
bool push (Request request, Source source)
{
auto const source_key = source_type{ source, channel };

auto it = queues.find (source_key);
auto it = queues.find (source);

// Create a new queue if it doesn't exist
if (it == queues.end ())
{
auto max_size = max_size_query (source_key);
auto priority = priority_query (source_key);
auto [max_rate, max_burst_ratio] = rate_limit_query (source_key);
auto max_size = max_size_query (source);
auto priority = priority_query (source);
auto [max_rate, max_burst_ratio] = rate_limit_query (source);

debug_assert (max_size > 0);
debug_assert (priority > 0);

entry new_entry{ max_size, priority, max_rate, max_burst_ratio };
it = queues.emplace (source_type{ source, channel }, std::move (new_entry)).first;
it = queues.emplace (source, std::move (new_entry)).first;
}
release_assert (it != queues.end ());

Expand Down Expand Up @@ -229,7 +262,7 @@ class fair_queue final
current_queue = queues.end (); // Invalidate current iterator

erase_if (queues, [] (auto const & entry) {
return !entry.first.channel->alive ();
return !entry.first.alive ();
});
}

Expand All @@ -249,4 +282,10 @@ class fair_queue final
return composite;
}
};

template <typename Type, typename Request>
using per_peer_fair_queue = fair_queue<fair_queue_sources::source_by_type_and_channel<Type>, Request>;

template <typename Type, typename Request>
using per_type_fair_queue = fair_queue<fair_queue_sources::source_by_type<Type>, Request>;
}

0 comments on commit 2beefa2

Please sign in to comment.