Skip to content

Commit

Permalink
Merge pull request #4629 from pwojcikdev/vote-cache-fixing-2
Browse files Browse the repository at this point in the history
More vote cache optimizations
  • Loading branch information
pwojcikdev authored May 19, 2024
2 parents 7662e45 + 92d3cc4 commit a7c8b48
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 107 deletions.
5 changes: 4 additions & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
});

vote_router.vote_processed.add ([this] (std::shared_ptr<nano::vote> const & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code> const & results) {
vote_cache.observe (vote, source, results);
if (source != nano::vote_source::cache)
{
vote_cache.insert (vote, results);
}
});

// Republish vote if it is new and the node does not host a principal representative (or close to)
Expand Down
143 changes: 67 additions & 76 deletions nano/node/vote_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ bool nano::vote_cache_entry::vote (std::shared_ptr<nano::vote> const & vote, con
bool updated = vote_impl (vote, rep_weight, max_voters);
if (updated)
{
auto [tally, final_tally] = calculate_tally ();
tally_m = tally;
final_tally_m = final_tally;
last_vote_m = std::chrono::steady_clock::now ();
}
return updated;
Expand All @@ -36,15 +39,12 @@ bool nano::vote_cache_entry::vote_impl (std::shared_ptr<nano::vote> const & vote
// It is not essential to keep tally up to date if rep voting weight changes, elections do tally calculations independently, so in the worst case scenario only our queue ordering will be a bit off
if (vote->timestamp () > existing->vote->timestamp ())
{
bool was_final = existing->vote->is_final ();
voters.modify (existing, [&vote, &rep_weight] (auto & existing) {
existing.vote = vote;
existing.weight = rep_weight;
});
return true;
}
else
{
return false;
return !was_final && vote->is_final (); // Tally changed only if the vote became final
}
}
else
Expand Down Expand Up @@ -76,35 +76,24 @@ bool nano::vote_cache_entry::vote_impl (std::shared_ptr<nano::vote> const & vote

return true;
}
else
{
return false;
}
}
return false; // Tally unchanged
}

std::size_t nano::vote_cache_entry::size () const
{
return voters.size ();
}

nano::block_hash nano::vote_cache_entry::hash () const
{
return hash_m;
}

nano::uint128_t nano::vote_cache_entry::tally () const
{
return std::accumulate (voters.begin (), voters.end (), nano::uint128_t{ 0 }, [] (auto sum, auto const & item) {
return sum + item.weight;
});
}

nano::uint128_t nano::vote_cache_entry::final_tally () const
auto nano::vote_cache_entry::calculate_tally () const -> std::pair<nano::uint128_t, nano::uint128_t>
{
return std::accumulate (voters.begin (), voters.end (), nano::uint128_t{ 0 }, [] (auto sum, auto const & item) {
return sum + (item.vote->is_final () ? item.weight : 0);
});
nano::uint128_t tally{ 0 }, final_tally{ 0 };
for (auto const & voter : voters)
{
tally += voter.weight;
final_tally += voter.vote->is_final () ? voter.weight : 0;
}
return { tally, final_tally };
}

std::vector<std::shared_ptr<nano::vote>> nano::vote_cache_entry::votes () const
Expand All @@ -113,11 +102,6 @@ std::vector<std::shared_ptr<nano::vote>> nano::vote_cache_entry::votes () const
return { r.begin (), r.end () };
}

std::chrono::steady_clock::time_point nano::vote_cache_entry::last_vote () const
{
return last_vote_m;
}

/*
* vote_cache
*/
Expand All @@ -128,61 +112,66 @@ nano::vote_cache::vote_cache (vote_cache_config const & config_a, nano::stats &
{
}

void nano::vote_cache::observe (const std::shared_ptr<nano::vote> & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code> results)
void nano::vote_cache::insert (std::shared_ptr<nano::vote> const & vote, std::unordered_map<nano::block_hash, nano::vote_code> const & results)
{
if (source != nano::vote_source::cache)
{
insert (vote, [&results] (nano::block_hash const & hash) {
// This filters which hashes should be included in the vote cache
if (auto it = results.find (hash); it != results.end ())
{
auto result = it->second;
// Cache votes with a corresponding active election (indicated by `vote_code::vote`) in case that election gets dropped
return result == nano::vote_code::vote || result == nano::vote_code::indeterminate;
}
debug_assert (false);
return false;
});
}
}
// Results map should be empty or have the same hashes as the vote
debug_assert (results.empty () || std::all_of (vote->hashes.begin (), vote->hashes.end (), [&results] (auto const & hash) { return results.find (hash) != results.end (); }));

void nano::vote_cache::insert (std::shared_ptr<nano::vote> const & vote, std::function<bool (nano::block_hash const &)> filter)
{
auto const representative = vote->account;
auto const timestamp = vote->timestamp ();
auto const rep_weight = rep_weight_query (representative);

nano::lock_guard<nano::mutex> lock{ mutex };

for (auto const & hash : vote->hashes)
// Cache votes with a corresponding active election (indicated by `vote_code::vote`) in case that election gets dropped
auto filter = [] (auto code) {
return code == nano::vote_code::vote || code == nano::vote_code::indeterminate;
};

// If results map is empty, insert all hashes (meant for testing)
if (results.empty ())
{
// Using filter callback here to avoid unnecessary relocking when processing large votes
if (!filter (hash))
for (auto const & hash : vote->hashes)
{
continue;
insert_impl (vote, hash, rep_weight);
}

if (auto existing = cache.find (hash); existing != cache.end ())
}
else
{
for (auto const & [hash, code] : results)
{
stats.inc (nano::stat::type::vote_cache, nano::stat::detail::update);

cache.modify (existing, [this, &vote, &rep_weight] (entry & ent) {
ent.vote (vote, rep_weight, config.max_voters);
});
if (filter (code))
{
insert_impl (vote, hash, rep_weight);
}
}
else
{
stats.inc (nano::stat::type::vote_cache, nano::stat::detail::insert);
}
}

entry cache_entry{ hash };
cache_entry.vote (vote, rep_weight, config.max_voters);
cache.insert (cache_entry);
void nano::vote_cache::insert_impl (std::shared_ptr<nano::vote> const & vote, nano::block_hash const & hash, nano::uint128_t const & rep_weight)
{
debug_assert (!mutex.try_lock ());
debug_assert (std::any_of (vote->hashes.begin (), vote->hashes.end (), [&hash] (auto const & vote_hash) { return vote_hash == hash; }));

// Remove the oldest entry if we have reached the capacity limit
if (cache.size () > config.max_size)
{
cache.get<tag_sequenced> ().pop_front ();
}
if (auto existing = cache.find (hash); existing != cache.end ())
{
stats.inc (nano::stat::type::vote_cache, nano::stat::detail::update);

cache.modify (existing, [this, &vote, &rep_weight] (entry & ent) {
ent.vote (vote, rep_weight, config.max_voters);
});
}
else
{
stats.inc (nano::stat::type::vote_cache, nano::stat::detail::insert);

entry cache_entry{ hash };
cache_entry.vote (vote, rep_weight, config.max_voters);
cache.insert (cache_entry);

// Remove the oldest entry if we have reached the capacity limit
if (cache.size () > config.max_size)
{
cache.get<tag_sequenced> ().pop_front ();
}
}
}
Expand Down Expand Up @@ -231,11 +220,11 @@ void nano::vote_cache::clear ()
cache.clear ();
}

std::vector<nano::vote_cache::top_entry> nano::vote_cache::top (const nano::uint128_t & min_tally)
std::deque<nano::vote_cache::top_entry> nano::vote_cache::top (const nano::uint128_t & min_tally)
{
stats.inc (nano::stat::type::vote_cache, nano::stat::detail::top);

std::vector<top_entry> results;
std::deque<top_entry> results;
{
nano::lock_guard<nano::mutex> lock{ mutex };

Expand All @@ -244,12 +233,14 @@ std::vector<nano::vote_cache::top_entry> nano::vote_cache::top (const nano::uint
cleanup ();
}

for (auto & entry : cache)
for (auto & entry : cache.get<tag_tally> ())
{
if (entry.tally () >= min_tally)
auto tally = entry.tally ();
if (tally < min_tally)
{
results.push_back ({ entry.hash (), entry.tally (), entry.final_tally () });
break;
}
results.push_back ({ entry.hash (), tally, entry.final_tally () });
}
}

Expand Down
40 changes: 28 additions & 12 deletions nano/node/vote_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,29 @@ class vote_cache_entry final
bool vote (std::shared_ptr<nano::vote> const & vote, nano::uint128_t const & rep_weight, std::size_t max_voters);

std::size_t size () const;
nano::block_hash hash () const;
nano::uint128_t tally () const;
nano::uint128_t final_tally () const;
std::vector<std::shared_ptr<nano::vote>> votes () const;
std::chrono::steady_clock::time_point last_vote () const;

public: // Keep accessors inlined
nano::block_hash hash () const
{
return hash_m;
}
std::chrono::steady_clock::time_point last_vote () const
{
return last_vote_m;
}
nano::uint128_t tally () const
{
return tally_m;
}
nano::uint128_t final_tally () const
{
return final_tally_m;
}

private:
bool vote_impl (std::shared_ptr<nano::vote> const & vote, nano::uint128_t const & rep_weight, std::size_t max_voters);
std::pair<nano::uint128_t, nano::uint128_t> calculate_tally () const; // <tally, final_tally>

// clang-format off
class tag_representative {};
Expand All @@ -95,6 +110,8 @@ class vote_cache_entry final

nano::block_hash const hash_m;
std::chrono::steady_clock::time_point last_vote_m{};
nano::uint128_t tally_m{ 0 };
nano::uint128_t final_tally_m{ 0 };
};

class vote_cache final
Expand All @@ -110,12 +127,7 @@ class vote_cache final
*/
void insert (
std::shared_ptr<nano::vote> const & vote,
std::function<bool (nano::block_hash const &)> filter = [] (nano::block_hash const &) { return true; });

/**
* Should be called for every processed vote, filters which votes should be added to cache
*/
void observe (std::shared_ptr<nano::vote> const & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code>);
std::unordered_map<nano::block_hash, nano::vote_code> const & results = {});

/**
* Tries to find an entry associated with block hash
Expand Down Expand Up @@ -145,7 +157,7 @@ class vote_cache final
* The blocks are sorted in descending order by final tally, then by tally
* @param min_tally minimum tally threshold, entries below with their voting weight below this will be ignored
*/
std::vector<top_entry> top (nano::uint128_t const & min_tally);
std::deque<top_entry> top (nano::uint128_t const & min_tally);

public: // Container info
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name) const;
Expand All @@ -161,19 +173,23 @@ class vote_cache final
nano::stats & stats;

private:
void insert_impl (std::shared_ptr<nano::vote> const &, nano::block_hash const & hash, nano::uint128_t const & rep_weight);
void cleanup ();

// clang-format off
class tag_sequenced {};
class tag_hash {};
class tag_tally {};
// clang-format on

// clang-format off
using ordered_cache = boost::multi_index_container<entry,
mi::indexed_by<
mi::hashed_unique<mi::tag<tag_hash>,
mi::const_mem_fun<entry, nano::block_hash, &entry::hash>>,
mi::sequenced<mi::tag<tag_sequenced>>
mi::sequenced<mi::tag<tag_sequenced>>,
mi::ordered_non_unique<mi::tag<tag_tally>,
mi::const_mem_fun<entry, nano::uint128_t, &entry::tally>, std::greater<>> // DESC
>>;
// clang-format on
ordered_cache cache;
Expand Down
Loading

0 comments on commit a7c8b48

Please sign in to comment.