Skip to content

Commit

Permalink
Vote processed event
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 21, 2024
1 parent 582cf21 commit 00f7bfe
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 41 deletions.
6 changes: 3 additions & 3 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ TEST (inactive_votes_cache, existing_vote)
ASSERT_EQ (send->hash (), last_vote1.hash);
ASSERT_EQ (nano::vote::timestamp_min * 1, last_vote1.timestamp);
// Attempt to change vote with inactive_votes_cache
node.vote_cache.vote (vote1);
node.vote_cache.insert (vote1);
auto cached = node.vote_cache.find (send->hash ());
ASSERT_EQ (1, cached.size ());
for (auto const & cached_vote : cached)
Expand Down Expand Up @@ -1535,7 +1535,7 @@ TEST (active_transactions, allow_limited_overflow)
{
// Non-final vote, so it stays in the AEC without getting confirmed
auto vote = nano::test::make_vote (nano::dev::genesis_key, { block });
node.vote_cache.vote (vote);
node.vote_cache.insert (vote);
}

// Ensure active elections overfill AEC only up to normal + hinted limit
Expand Down Expand Up @@ -1573,7 +1573,7 @@ TEST (active_transactions, allow_limited_overflow_adapt)
{
// Non-final vote, so it stays in the AEC without getting confirmed
auto vote = nano::test::make_vote (nano::dev::genesis_key, { block });
node.vote_cache.vote (vote);
node.vote_cache.insert (vote);
}

// Ensure hinted election amount is bounded by hinted limit
Expand Down
40 changes: 20 additions & 20 deletions nano/core_test/vote_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ TEST (vote_cache, insert_one_hash)
auto rep1 = create_rep (7);
auto hash1 = nano::test::random_hash ();
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.insert (vote1);
ASSERT_EQ (1, vote_cache.size ());

auto peek1 = vote_cache.find (hash1);
Expand Down Expand Up @@ -88,9 +88,9 @@ TEST (vote_cache, insert_one_hash_many_votes)
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 1 * 1024 * 1024);
auto vote2 = nano::test::make_vote (rep2, { hash1 }, 2 * 1024 * 1024);
auto vote3 = nano::test::make_vote (rep3, { hash1 }, 3 * 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.vote (vote2);
vote_cache.vote (vote3);
vote_cache.insert (vote1);
vote_cache.insert (vote2);
vote_cache.insert (vote3);

ASSERT_EQ (1, vote_cache.size ());
auto peek1 = vote_cache.find (hash1);
Expand Down Expand Up @@ -132,9 +132,9 @@ TEST (vote_cache, insert_many_hashes_many_votes)
auto vote3 = nano::test::make_vote (rep3, { hash3 }, 1024 * 1024);
auto vote4 = nano::test::make_vote (rep4, { hash1 }, 1024 * 1024);
// Insert first 3 votes in cache
vote_cache.vote (vote1);
vote_cache.vote (vote2);
vote_cache.vote (vote3);
vote_cache.insert (vote1);
vote_cache.insert (vote2);
vote_cache.insert (vote3);
// Ensure all of those are properly inserted
ASSERT_EQ (3, vote_cache.size ());
ASSERT_EQ (2, vote_cache.find (hash1).size ());
Expand All @@ -152,7 +152,7 @@ TEST (vote_cache, insert_many_hashes_many_votes)
ASSERT_EQ (peek1.front (), vote3);

// Now add a vote from rep4 with the highest voting weight
vote_cache.vote (vote4);
vote_cache.insert (vote4);

// Ensure that the first entry in queue is now the one for hash1 (rep1 + rep4 tally weight)
auto tops2 = vote_cache.top (0);
Expand Down Expand Up @@ -195,8 +195,8 @@ TEST (vote_cache, insert_duplicate)
auto rep1 = create_rep (9);
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 1 * 1024 * 1024);
auto vote2 = nano::test::make_vote (rep1, { hash1 }, 1 * 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.vote (vote2);
vote_cache.insert (vote1);
vote_cache.insert (vote2);
ASSERT_EQ (1, vote_cache.size ());
}

Expand All @@ -212,12 +212,12 @@ TEST (vote_cache, insert_newer)
auto hash1 = nano::test::random_hash ();
auto rep1 = create_rep (9);
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 1 * 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.insert (vote1);
auto peek1 = vote_cache.find (hash1);
ASSERT_EQ (peek1.size (), 1);
ASSERT_EQ (peek1.front (), vote1);
auto vote2 = nano::test::make_final_vote (rep1, { hash1 });
vote_cache.vote (vote2);
vote_cache.insert (vote2);
auto peek2 = vote_cache.find (hash1);
ASSERT_EQ (peek2.size (), 1);
ASSERT_EQ (peek2.front (), vote2); // vote2 should replace vote1 as it has a higher timestamp
Expand All @@ -235,12 +235,12 @@ TEST (vote_cache, insert_older)
auto hash1 = nano::test::random_hash ();
auto rep1 = create_rep (9);
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 2 * 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.insert (vote1);
auto peek1 = vote_cache.find (hash1);
ASSERT_EQ (peek1.size (), 1);
ASSERT_EQ (peek1.front (), vote1);
auto vote2 = nano::test::make_vote (rep1, { hash1 }, 1 * 1024 * 1024);
vote_cache.vote (vote2);
vote_cache.insert (vote2);
auto peek2 = vote_cache.find (hash1);
ASSERT_EQ (peek2.size (), 1);
ASSERT_EQ (peek2.front (), vote1); // vote1 should still be in cache as it has a higher timestamp
Expand All @@ -265,9 +265,9 @@ TEST (vote_cache, erase)
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 1024 * 1024);
auto vote2 = nano::test::make_vote (rep2, { hash2 }, 1024 * 1024);
auto vote3 = nano::test::make_vote (rep3, { hash3 }, 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.vote (vote2);
vote_cache.vote (vote3);
vote_cache.insert (vote1);
vote_cache.insert (vote2);
vote_cache.insert (vote3);
ASSERT_EQ (3, vote_cache.size ());
ASSERT_FALSE (vote_cache.empty ());
ASSERT_FALSE (vote_cache.find (hash1).empty ());
Expand Down Expand Up @@ -304,7 +304,7 @@ TEST (vote_cache, overfill)
auto rep1 = create_rep (count - n);
auto hash1 = nano::test::random_hash ();
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.insert (vote1);
}
ASSERT_LT (vote_cache.size (), count);
// Check that oldest votes are dropped first
Expand All @@ -328,7 +328,7 @@ TEST (vote_cache, overfill_entry)
{
auto rep1 = create_rep (9);
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.insert (vote1);
}
ASSERT_EQ (1, vote_cache.size ());
}
Expand All @@ -344,7 +344,7 @@ TEST (vote_cache, age_cutoff)
auto hash1 = nano::test::random_hash ();
auto rep1 = create_rep (9);
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 3);
vote_cache.vote (vote1);
vote_cache.insert (vote1);
ASSERT_EQ (1, vote_cache.size ());
ASSERT_FALSE (vote_cache.find (hash1).empty ());

Expand Down
30 changes: 17 additions & 13 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,9 @@ nano::election_insertion_result nano::active_transactions::insert (std::shared_p

if (result.inserted)
{
release_assert (result.election);
debug_assert (result.election);

auto cached = node.vote_cache.find (hash);
for (auto const & cached_vote : cached)
{
vote (cached_vote);
}
trigger_vote_cache (hash);

node.observers.active_started.notify (hash);
vacancy_update ();
Expand All @@ -442,8 +438,18 @@ nano::election_insertion_result nano::active_transactions::insert (std::shared_p
return result;
}

bool nano::active_transactions::trigger_vote_cache (nano::block_hash hash)
{
auto cached = node.vote_cache.find (hash);
for (auto const & cached_vote : cached)
{
vote (cached_vote, nano::vote_source::cache);
}
return !cached.empty ();
}

// Validate a vote and apply it to the current election if one exists
std::unordered_map<nano::block_hash, nano::vote_code> nano::active_transactions::vote (std::shared_ptr<nano::vote> const & vote)
std::unordered_map<nano::block_hash, nano::vote_code> nano::active_transactions::vote (std::shared_ptr<nano::vote> const & vote, nano::vote_source source)
{
std::unordered_map<nano::block_hash, nano::vote_code> results;
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> process;
Expand Down Expand Up @@ -480,7 +486,7 @@ std::unordered_map<nano::block_hash, nano::vote_code> nano::active_transactions:

for (auto const & [block_hash, election] : process)
{
auto const vote_result = election->vote (vote->account, vote->timestamp (), block_hash);
auto const vote_result = election->vote (vote->account, vote->timestamp (), block_hash, source);
results[block_hash] = vote_result;

processed |= (vote_result == nano::vote_code::vote);
Expand All @@ -502,6 +508,8 @@ std::unordered_map<nano::block_hash, nano::vote_code> nano::active_transactions:
return results.find (hash) != results.end ();
}));

vote_processed.notify (vote, source, results);

return results;
}

Expand Down Expand Up @@ -609,11 +617,7 @@ bool nano::active_transactions::publish (std::shared_ptr<nano::block> const & bl
blocks.emplace (block_a->hash (), election);
lock.unlock ();

auto cached = node.vote_cache.find (block_a->hash ());
for (auto const & cached_vote : cached)
{
vote (cached_vote);
}
trigger_vote_cache (block_a->hash ());

node.stats.inc (nano::stat::type::active, nano::stat::detail::election_block_conflict);
}
Expand Down
7 changes: 6 additions & 1 deletion nano/node/active_transactions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class active_transactions final
*/
nano::election_insertion_result insert (std::shared_ptr<nano::block> const &, nano::election_behavior = nano::election_behavior::normal);
// Distinguishes replay votes, cannot be determined if the block is not in any election
std::unordered_map<nano::block_hash, nano::vote_code> vote (std::shared_ptr<nano::vote> const &);
std::unordered_map<nano::block_hash, nano::vote_code> vote (std::shared_ptr<nano::vote> const &, nano::vote_source = nano::vote_source::live);
// Is the root of this block in the roots container
bool active (nano::block const &) const;
bool active (nano::qualified_root const &) const;
Expand Down Expand Up @@ -188,6 +188,10 @@ class active_transactions final
void add_election_winner_details (nano::block_hash const &, std::shared_ptr<nano::election> const &);
std::shared_ptr<nano::election> remove_election_winner_details (nano::block_hash const &);

public: // Events
using vote_processed_event_t = nano::observer_set<std::shared_ptr<nano::vote> const &, nano::vote_source, std::unordered_map<nano::block_hash, nano::vote_code> const &>;
vote_processed_event_t vote_processed;

private:
// Erase elections if we're over capacity
void trim ();
Expand All @@ -201,6 +205,7 @@ class active_transactions final
std::vector<std::shared_ptr<nano::election>> list_active_impl (std::size_t) const;
void activate_successors (nano::store::read_transaction const & transaction, std::shared_ptr<nano::block> const & block);
void notify_observers (nano::store::read_transaction const & transaction, nano::election_status const & status, std::vector<nano::vote_with_weight_info> const & votes);
bool trigger_vote_cache (nano::block_hash);

private: // Dependencies
nano::node & node;
Expand Down
4 changes: 4 additions & 0 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
scheduler.optimistic.activate (account, account_info, conf_info);
});

active.vote_processed.add ([this] (std::shared_ptr<nano::vote> const & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code> const & results) {
vote_cache.observe (vote, source, results);
});

if (!init_error ())
{
// Notify election schedulers when AEC frees election slot
Expand Down
21 changes: 20 additions & 1 deletion nano/node/vote_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,25 @@ nano::vote_cache::vote_cache (vote_cache_config const & config_a, nano::stats &
{
}

void nano::vote_cache::vote (std::shared_ptr<nano::vote> const & vote, std::function<bool (nano::block_hash const &)> const & filter)
void nano::vote_cache::observe (const std::shared_ptr<nano::vote> & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code> results)
{
if (source == nano::vote_source::live)
{
insert (vote, [&results] (nano::block_hash const & hash) {
// This filters which hashes should be included in the vote cache
if (auto it = results.find (hash); it != results.end ())
{
auto result = it->second;
// Cache votes with a corresponding active election (indicated by `vote_code::vote`) in case that election gets dropped
return result == nano::vote_code::vote || result == nano::vote_code::indeterminate;
}
debug_assert (false);
return false;
});
}
}

void nano::vote_cache::insert (std::shared_ptr<nano::vote> const & vote, std::function<bool (nano::block_hash const &)> filter)
{
auto const representative = vote->account;
auto const timestamp = vote->timestamp ();
Expand All @@ -138,6 +156,7 @@ void nano::vote_cache::vote (std::shared_ptr<nano::vote> const & vote, std::func

for (auto const & hash : vote->hashes)
{
// Using filter callback here to avoid unnecessary relocking when processing large votes
if (!filter (hash))
{
continue;
Expand Down
9 changes: 7 additions & 2 deletions nano/node/vote_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,14 @@ class vote_cache final
/**
* Adds a new vote to cache
*/
void vote (
void insert (
std::shared_ptr<nano::vote> const & vote,
std::function<bool (nano::block_hash const &)> const & filter = [] (nano::block_hash const &) { return true; });
std::function<bool (nano::block_hash const &)> filter = [] (nano::block_hash const &) { return true; });

/**
* Should be called for every processed vote, filters which votes should be added to cache
*/
void observe (std::shared_ptr<nano::vote> const & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code>);

/**
* Tries to find an entry associated with block hash
Expand Down
2 changes: 1 addition & 1 deletion nano/node/vote_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr<nano::vote>
}
result = replay ? nano::vote_code::replay : (processed ? nano::vote_code::vote : nano::vote_code::indeterminate);

observers.vote.notify (vote_a, channel_a, result);
observers.vote.notify (vote, channel, result);
}

stats.inc (nano::stat::type::vote, to_stat_detail (result));
Expand Down

0 comments on commit 00f7bfe

Please sign in to comment.