Skip to content

Commit

Permalink
Allow multiple queries per channel during warmup
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Feb 25, 2024
1 parent 202d6ce commit e73420b
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 25 deletions.
1 change: 1 addition & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ enum class detail : uint8_t
query_target_failed,
query_channel_busy,
query_sent,
query_duplicate,
rep_timeout,
query_timeout,
crawl_aggressive,
Expand Down
58 changes: 35 additions & 23 deletions nano/node/repcrawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,10 @@ void nano::rep_crawler::run ()
{
last_query = std::chrono::steady_clock::now ();

lock.unlock ();

auto targets = prepare_crawl_targets (sufficient_weight);
query (targets);

lock.unlock ();
query (targets);
lock.lock ();
}

Expand Down Expand Up @@ -233,23 +232,27 @@ void nano::rep_crawler::cleanup ()

std::vector<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::prepare_crawl_targets (bool sufficient_weight) const
{
debug_assert (!mutex.try_lock ());

// TODO: Make these values configurable
constexpr std::size_t conservative_count = 10;
constexpr std::size_t aggressive_count = 40;
constexpr std::size_t conservative_max_attempts = 1;
constexpr std::size_t aggressive_max_attempts = 8;

stats.inc (nano::stat::type::rep_crawler, sufficient_weight ? nano::stat::detail::crawl_normal : nano::stat::detail::crawl_aggressive);

// Crawl more aggressively if we lack sufficient total peer weight.
auto required_peer_count = sufficient_weight ? conservative_count : aggressive_count;
auto const required_peer_count = sufficient_weight ? conservative_count : aggressive_count;

stats.inc (nano::stat::type::rep_crawler, sufficient_weight ? nano::stat::detail::crawl_normal : nano::stat::detail::crawl_aggressive);
auto random_peers = node.network.random_set (required_peer_count, 0, /* Include channels with ephemeral remote ports */ true);

// Add random peers. We do this even if we have enough weight, in order to pick up reps
// that didn't respond when first observed. If the current total weight isn't sufficient, this
// will be more aggressive. When the node first starts, the rep container is empty and all
// endpoints will originate from random peers.
required_peer_count += required_peer_count / 2;
// Avoid querying the same peer multiple times when rep crawler is warmed up
auto const max_attempts = sufficient_weight ? conservative_max_attempts : aggressive_max_attempts;
erase_if (random_peers, [this, max_attempts] (std::shared_ptr<nano::transport::channel> const & channel) {
return queries.get<tag_channel> ().count (channel) >= max_attempts;
});

// The rest of the endpoints are picked randomly
auto random_peers = node.network.random_set (required_peer_count, 0, true); // Include channels with ephemeral remote ports
return { random_peers.begin (), random_peers.end () };
}

Expand Down Expand Up @@ -292,12 +295,15 @@ auto nano::rep_crawler::prepare_query_target () -> std::optional<hash_root_t>
return hash_root;
}

void nano::rep_crawler::track_rep_request (hash_root_t hash_root, std::shared_ptr<nano::transport::channel> const & channel)
bool nano::rep_crawler::track_rep_request (hash_root_t hash_root, std::shared_ptr<nano::transport::channel> const & channel)
{
debug_assert (!mutex.try_lock ());

debug_assert (queries.count (channel) == 0); // Only a single query should be active per channel
queries.emplace (query_entry{ hash_root.first, channel });
auto [it, inserted] = queries.emplace (query_entry{ hash_root.first, channel });
if (!inserted)
{
return false; // Duplicate, not tracked
}

// Find and update the timestamp on all reps available on the endpoint (a single host may have multiple reps)
auto & index = reps.get<tag_channel> ();
Expand All @@ -308,13 +314,16 @@ void nano::rep_crawler::track_rep_request (hash_root_t hash_root, std::shared_pt
info.last_request = std::chrono::steady_clock::now ();
});
}

return true;
}

void nano::rep_crawler::query (std::vector<std::shared_ptr<nano::transport::channel>> const & target_channels)
{
auto maybe_hash_root = prepare_query_target ();
if (!maybe_hash_root)
{
logger.debug (nano::log::type::rep_crawler, "No block to query");
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::query_target_failed);
return;
}
Expand All @@ -326,18 +335,18 @@ void nano::rep_crawler::query (std::vector<std::shared_ptr<nano::transport::chan
{
debug_assert (channel != nullptr);

// Only a single query should be active per channel
if (queries.find (channel) == queries.end ())
bool tracked = track_rep_request (hash_root, channel);
if (tracked)
{
track_rep_request (hash_root, channel);
node.network.send_confirm_req (channel, hash_root);

logger.debug (nano::log::type::rep_crawler, "Sending query for block {} to {}", hash_root.first.to_string (), channel->to_string ());
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::query_sent);
}
else
{
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::query_channel_busy);
logger.debug (nano::log::type::rep_crawler, "Ignoring duplicate query for block {} to {}", hash_root.first.to_string (), channel->to_string ());
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::query_duplicate);
}
}
}
Expand All @@ -361,10 +370,13 @@ bool nano::rep_crawler::is_pr (std::shared_ptr<nano::transport::channel> const &
bool nano::rep_crawler::process (std::shared_ptr<nano::vote> const & vote, std::shared_ptr<nano::transport::channel> const & channel)
{
nano::lock_guard<nano::mutex> lock{ mutex };
if (auto info = queries.find (channel); info != queries.end ())

auto & index = queries.get<tag_channel> ();
auto [begin, end] = index.equal_range (channel);
for (auto it = begin; it != end; ++it)
{
// TODO: This linear search could be slow, especially with large votes.
auto const target_hash = info->hash;
auto const target_hash = it->hash;
bool found = std::any_of (vote->hashes.begin (), vote->hashes.end (), [&target_hash] (nano::block_hash const & hash) {
return hash == target_hash;
});
Expand All @@ -376,10 +388,10 @@ bool nano::rep_crawler::process (std::shared_ptr<nano::vote> const & vote, std::
// TODO: Track query response time

responses.push_back ({ channel, vote });
queries.erase (info);
queries.erase (it);

condition.notify_all ();
return true;
return true; // Found and processed
}
}
return false;
Expand Down
4 changes: 2 additions & 2 deletions nano/node/repcrawler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class rep_crawler final
/** Returns a list of endpoints to crawl. The total weight is passed in to avoid computing it twice. */
std::vector<std::shared_ptr<nano::transport::channel>> prepare_crawl_targets (bool sufficient_weight) const;
std::optional<hash_root_t> prepare_query_target ();
void track_rep_request (hash_root_t hash_root, std::shared_ptr<nano::transport::channel> const & channel_a);
bool track_rep_request (hash_root_t hash_root, std::shared_ptr<nano::transport::channel> const & channel_a);

private:
/**
Expand Down Expand Up @@ -162,7 +162,7 @@ class rep_crawler final

using ordered_queries = boost::multi_index_container<query_entry,
mi::indexed_by<
mi::hashed_unique<mi::tag<tag_channel>,
mi::hashed_non_unique<mi::tag<tag_channel>,
mi::member<query_entry, std::shared_ptr<nano::transport::channel>, &query_entry::channel>>,
mi::sequenced<mi::tag<tag_sequenced>>,
mi::hashed_non_unique<mi::tag<tag_hash>,
Expand Down

0 comments on commit e73420b

Please sign in to comment.