Skip to content

Commit

Permalink
Allow multiple queries per channel during warmup
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Feb 25, 2024
1 parent c2220b8 commit 9f1573b
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 30 deletions.
64 changes: 35 additions & 29 deletions nano/node/repcrawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,10 @@ void nano::rep_crawler::run ()
{
last_query = std::chrono::steady_clock::now ();

lock.unlock ();

auto targets = prepare_crawl_targets (sufficient_weight);
query (targets);

lock.unlock ();
query (targets);
lock.lock ();
}

Expand Down Expand Up @@ -233,23 +232,27 @@ void nano::rep_crawler::cleanup ()

std::vector<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::prepare_crawl_targets (bool sufficient_weight) const
{
debug_assert (!mutex.try_lock ());

// TODO: Make these values configurable
constexpr std::size_t conservative_count = 10;
constexpr std::size_t aggressive_count = 40;

stats.inc (nano::stat::type::rep_crawler, sufficient_weight ? nano::stat::detail::crawl_normal : nano::stat::detail::crawl_aggressive);

// Crawl more aggressively if we lack sufficient total peer weight.
auto required_peer_count = sufficient_weight ? conservative_count : aggressive_count;
auto const required_peer_count = sufficient_weight ? conservative_count : aggressive_count;

stats.inc (nano::stat::type::rep_crawler, sufficient_weight ? nano::stat::detail::crawl_normal : nano::stat::detail::crawl_aggressive);
auto random_peers = node.network.random_set (required_peer_count, 0, /* Include channels with ephemeral remote ports */ true);

// Add random peers. We do this even if we have enough weight, in order to pick up reps
// that didn't respond when first observed. If the current total weight isn't sufficient, this
// will be more aggressive. When the node first starts, the rep container is empty and all
// endpoints will originate from random peers.
required_peer_count += required_peer_count / 2;
// Avoid querying the same peer multiple times when rep crawler is warmed up
if (sufficient_weight)
{
erase_if (random_peers, [this] (std::shared_ptr<nano::transport::channel> const & channel) {
return queries.get<tag_channel> ().count (channel) > 0;
});
}

// The rest of the endpoints are picked randomly
auto random_peers = node.network.random_set (required_peer_count, 0, true); // Include channels with ephemeral remote ports
return { random_peers.begin (), random_peers.end () };
}

Expand Down Expand Up @@ -296,7 +299,7 @@ void nano::rep_crawler::track_rep_request (hash_root_t hash_root, std::shared_pt
{
debug_assert (!mutex.try_lock ());

debug_assert (queries.count (channel) == 0); // Only a single query should be active per channel
// TODO: Handle insertion failure
queries.emplace (query_entry{ hash_root.first, channel });

// Find and update the timestamp on all reps available on the endpoint (a single host may have multiple reps)
Expand Down Expand Up @@ -327,18 +330,18 @@ void nano::rep_crawler::query (std::vector<std::shared_ptr<nano::transport::chan
debug_assert (channel != nullptr);

// Only a single query should be active per channel
if (queries.find (channel) == queries.end ())
{
track_rep_request (hash_root, channel);
node.network.send_confirm_req (channel, hash_root);

logger.debug (nano::log::type::rep_crawler, "Sending query for block {} to {}", hash_root.first.to_string (), channel->to_string ());
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::query_sent);
}
else
{
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::query_channel_busy);
}
// if (queries.find (channel) == queries.end ())
// {
track_rep_request (hash_root, channel);
node.network.send_confirm_req (channel, hash_root);

logger.debug (nano::log::type::rep_crawler, "Sending query for block {} to {}", hash_root.first.to_string (), channel->to_string ());
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::query_sent);
// }
// else
// {
// stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::query_channel_busy);
// }
}
}

Expand All @@ -361,10 +364,13 @@ bool nano::rep_crawler::is_pr (std::shared_ptr<nano::transport::channel> const &
bool nano::rep_crawler::process (std::shared_ptr<nano::vote> const & vote, std::shared_ptr<nano::transport::channel> const & channel)
{
nano::lock_guard<nano::mutex> lock{ mutex };
if (auto info = queries.find (channel); info != queries.end ())

auto & index = queries.get<tag_channel> ();
auto [begin, end] = index.equal_range (channel);
for (auto it = begin; it != end; ++it)
{
// TODO: This linear search could be slow, especially with large votes.
auto const target_hash = info->hash;
auto const target_hash = it->hash;
bool found = std::any_of (vote->hashes.begin (), vote->hashes.end (), [&target_hash] (nano::block_hash const & hash) {
return hash == target_hash;
});
Expand All @@ -376,10 +382,10 @@ bool nano::rep_crawler::process (std::shared_ptr<nano::vote> const & vote, std::
// TODO: Track query response time

responses.push_back ({ channel, vote });
queries.erase (info);
queries.erase (it);

condition.notify_all ();
return true;
return true; // Found and processed
}
}
return false;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/repcrawler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class rep_crawler final

using ordered_queries = boost::multi_index_container<query_entry,
mi::indexed_by<
mi::hashed_unique<mi::tag<tag_channel>,
mi::hashed_non_unique<mi::tag<tag_channel>,
mi::member<query_entry, std::shared_ptr<nano::transport::channel>, &query_entry::channel>>,
mi::sequenced<mi::tag<tag_sequenced>>,
mi::hashed_non_unique<mi::tag<tag_hash>,
Expand Down

0 comments on commit 9f1573b

Please sign in to comment.