Skip to content

Commit

Permalink
WIP: REMOVE PROCESSING QUEUE
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Jun 13, 2024
1 parent 0c83920 commit fa49b85
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 52 deletions.
2 changes: 1 addition & 1 deletion nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum class name
bootstrap_initiator,
bootstrap_connections,
voting,
voting_verification,
signature_checking,
rpc_request_processor,
rpc_process_container,
Expand All @@ -34,7 +35,6 @@ enum class name
db_parallel_traversal,
unchecked,
backlog_population,
vote_generator_queue,
bootstrap_server,
telemetry,
ascending_bootstrap,
Expand Down
86 changes: 47 additions & 39 deletions nano/node/vote_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,47 @@ nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::
stats (stats_a),
logger (logger_a),
is_final (is_final_a),
vote_generation_queue{ stats, nano::stat::type::vote_generator, nano::thread_role::name::vote_generator_queue, /* single threaded */ 1, /* max queue size */ 1024 * 32, /* max batch size */ 256 },
inproc_channel{ std::make_shared<nano::transport::inproc::channel> (node, node) }
{
vote_generation_queue.process_batch = [this] (auto & batch) {
process_batch (batch);
};
}

nano::vote_generator::~vote_generator ()
{
debug_assert (stopped);
debug_assert (!thread.joinable ());
debug_assert (!verification_thread.joinable ());
}

void nano::vote_generator::start ()
{
debug_assert (!thread.joinable ());

thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::voting);
run ();
});

verification_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::voting_verification);
run_verification ();
});
}

void nano::vote_generator::stop ()
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
condition.notify_all ();

join_or_pass (thread);
join_or_pass (verification_thread);
}

void nano::vote_generator::add (const root & root, const block_hash & hash)
{
// TODO: ...
}

bool nano::vote_generator::should_vote (transaction_variant_t const & transaction_variant, nano::root const & root_a, nano::block_hash const & hash_a) const
Expand Down Expand Up @@ -72,40 +101,11 @@ bool nano::vote_generator::should_vote (transaction_variant_t const & transactio
return should_vote;
}

void nano::vote_generator::start ()
{
debug_assert (!thread.joinable ());
thread = std::thread ([this] () { run (); });

vote_generation_queue.start ();
}

void nano::vote_generator::stop ()
{
vote_generation_queue.stop ();

nano::unique_lock<nano::mutex> lock{ mutex };
stopped = true;

lock.unlock ();
condition.notify_all ();

if (thread.joinable ())
{
thread.join ();
}
}

void nano::vote_generator::add (const root & root, const block_hash & hash)
{
vote_generation_queue.add (std::make_pair (root, hash));
}

void nano::vote_generator::process_batch (std::deque<queue_entry_t> & batch)
void nano::vote_generator::verify_batch (std::deque<candidate_t> & batch)
{
std::deque<candidate_t> verified;

auto verify_batch = [this, &verified] (auto && transaction_variant, auto && batch) {
auto verify_batch_impl = [this, &verified] (auto && transaction_variant, auto && batch) {
for (auto & [root, hash] : batch)
{
if (should_vote (transaction_variant, root, hash))
Expand All @@ -120,15 +120,15 @@ void nano::vote_generator::process_batch (std::deque<queue_entry_t> & batch)
auto guard = ledger.store.write_queue.wait (nano::store::writer::voting_final);
transaction_variant_t transaction_variant{ ledger.tx_begin_write ({ tables::final_votes }) };

verify_batch (transaction_variant, batch);
verify_batch_impl (transaction_variant, batch);

// Commit write transaction
}
else
{
transaction_variant_t transaction_variant{ ledger.tx_begin_read () };

verify_batch (transaction_variant, batch);
verify_batch_impl (transaction_variant, batch);
}

// Submit verified candidates to the main processing thread
Expand All @@ -144,6 +144,15 @@ void nano::vote_generator::process_batch (std::deque<queue_entry_t> & batch)
}
}

void nano::vote_generator::run_verification ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
// TODO: ...
}
}

std::size_t nano::vote_generator::generate (std::vector<std::shared_ptr<nano::block>> const & blocks_a, std::shared_ptr<nano::transport::channel> const & channel_a)
{
request_t::first_type req_candidates;
Expand Down Expand Up @@ -280,7 +289,6 @@ void nano::vote_generator::broadcast_action (std::shared_ptr<nano::vote> const &

void nano::vote_generator::run ()
{
nano::thread_role::set (nano::thread_role::name::voting);
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
Expand Down Expand Up @@ -323,6 +331,6 @@ std::unique_ptr<nano::container_info_component> nano::vote_generator::collect_co
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "candidates", candidates_count, sizeof_candidate_element }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "requests", requests_count, sizeof_request_element }));
composite->add_component (vote_generation_queue.collect_container_info ("vote_generation_queue"));
// composite->add_component (vote_generation_queue.collect_container_info ("vote_generation_queue"));
return composite;
}
30 changes: 18 additions & 12 deletions nano/node/vote_generator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,36 @@ class vote_generator final
private:
using candidate_t = std::pair<nano::root, nano::block_hash>;
using request_t = std::pair<std::vector<candidate_t>, std::shared_ptr<nano::transport::channel>>;
using queue_entry_t = std::pair<nano::root, nano::block_hash>;

public:
vote_generator (nano::node_config const &, nano::node &, nano::ledger &, nano::wallets &, nano::vote_processor &, nano::local_vote_history &, nano::network &, nano::stats &, nano::logger &, bool is_final);
~vote_generator ();

void start ();
void stop ();

/** Queue items for vote generation, or broadcast votes already in cache */
void add (nano::root const &, nano::block_hash const &);

/** Queue blocks for vote generation, returning the number of successful candidates.*/
std::size_t generate (std::vector<std::shared_ptr<nano::block>> const & blocks_a, std::shared_ptr<nano::transport::channel> const & channel_a);
void set_reply_action (std::function<void (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &)>);
std::size_t generate (std::vector<std::shared_ptr<nano::block>> const & blocks, std::shared_ptr<nano::transport::channel> const &);

void start ();
void stop ();
// TODO: This is unnecessary
void set_reply_action (std::function<void (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &)>);

std::unique_ptr<container_info_component> collect_container_info (std::string const & name) const;

private:
using transaction_variant_t = std::variant<nano::secure::read_transaction, nano::secure::write_transaction>;

void run ();
void run_verification ();
void broadcast (nano::unique_lock<nano::mutex> &);
void reply (nano::unique_lock<nano::mutex> &, request_t &&);
void vote (std::vector<nano::block_hash> const &, std::vector<nano::root> const &, std::function<void (std::shared_ptr<nano::vote> const &)> const &);
void broadcast_action (std::shared_ptr<nano::vote> const &) const;
void process_batch (std::deque<queue_entry_t> & batch);
std::deque<candidate_t> next_batch (size_t max_count);
void verify_batch (std::deque<candidate_t> & batch);
bool should_vote (transaction_variant_t const &, nano::root const &, nano::block_hash const &) const;

private:
Expand All @@ -76,18 +80,20 @@ class vote_generator final
std::unique_ptr<nano::vote_spacing> spacing_impl;
nano::vote_spacing & spacing;

private:
nano::processing_queue<queue_entry_t> vote_generation_queue;

private:
const bool is_final;
mutable nano::mutex mutex;
nano::condition_variable condition;
static std::size_t constexpr max_requests{ 2048 };

std::shared_ptr<nano::transport::channel> inproc_channel;

std::unordered_set<candidate_t> queued;
std::deque<request_t> requests;
std::deque<candidate_t> candidates;

mutable nano::mutex mutex;
nano::condition_variable condition;
std::atomic<bool> stopped{ false };
std::thread thread;
std::shared_ptr<nano::transport::channel> inproc_channel;
std::thread verification_thread;
};
}

0 comments on commit fa49b85

Please sign in to comment.