Skip to content

Commit

Permalink
Do not query new channels on network thread in rep_crawler
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Nov 27, 2024
1 parent 7855c82 commit 9de0e0e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 16 deletions.
36 changes: 26 additions & 10 deletions nano/node/repcrawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ nano::rep_crawler::rep_crawler (nano::rep_crawler_config const & config_a, nano:
network_constants{ node_a.network_params.network },
active{ node_a.active }
{
if (!node.flags.disable_rep_crawler)
{
node.observers.endpoint.add ([this] (std::shared_ptr<nano::transport::channel> const & channel) {
query (channel);
});
}
node.observers.endpoint.add ([this] (std::shared_ptr<nano::transport::channel> const & channel) {
if (!node.flags.disable_rep_crawler)
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
prioritized.push_back (channel);
}
condition.notify_all ();
}
});
}

nano::rep_crawler::~rep_crawler ()
Expand Down Expand Up @@ -161,7 +165,7 @@ void nano::rep_crawler::run ()
lock.lock ();

condition.wait_for (lock, query_interval (sufficient_weight), [this, sufficient_weight] {
return stopped || query_predicate (sufficient_weight) || !responses.empty ();
return stopped || query_predicate (sufficient_weight) || !responses.empty () || !prioritized.empty ();
});

if (stopped)
Expand All @@ -180,6 +184,16 @@ void nano::rep_crawler::run ()

cleanup ();

if (!prioritized.empty ())
{
decltype (prioritized) prioritized_l;
prioritized_l.swap (prioritized);

lock.unlock ();
query (prioritized_l);
lock.lock ();
}

if (query_predicate (sufficient_weight))
{
last_query = std::chrono::steady_clock::now ();
Expand Down Expand Up @@ -230,7 +244,7 @@ void nano::rep_crawler::cleanup ()
});
}

std::vector<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::prepare_crawl_targets (bool sufficient_weight) const
std::deque<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::prepare_crawl_targets (bool sufficient_weight) const
{
debug_assert (!mutex.try_lock ());

Expand Down Expand Up @@ -310,7 +324,7 @@ bool nano::rep_crawler::track_rep_request (hash_root_t hash_root, std::shared_pt
return true;
}

void nano::rep_crawler::query (std::vector<std::shared_ptr<nano::transport::channel>> const & target_channels)
void nano::rep_crawler::query (std::deque<std::shared_ptr<nano::transport::channel>> const & target_channels)
{
auto maybe_hash_root = prepare_query_target ();
if (!maybe_hash_root)
Expand Down Expand Up @@ -356,7 +370,7 @@ void nano::rep_crawler::query (std::vector<std::shared_ptr<nano::transport::chan

void nano::rep_crawler::query (std::shared_ptr<nano::transport::channel> const & target_channel)
{
query (std::vector{ target_channel });
query (std::deque{ target_channel });
}

bool nano::rep_crawler::is_pr (std::shared_ptr<nano::transport::channel> const & channel) const
Expand Down Expand Up @@ -432,6 +446,7 @@ std::vector<nano::representative> nano::rep_crawler::representatives (std::size_
}

std::vector<nano::representative> result;
result.reserve (ordered.size ());
for (auto i = ordered.begin (), n = ordered.end (); i != n && result.size () < count; ++i)
{
auto const & [weight, rep] = *i;
Expand Down Expand Up @@ -483,6 +498,7 @@ nano::container_info nano::rep_crawler::container_info () const
info.put ("reps", reps);
info.put ("queries", queries);
info.put ("responses", responses);
info.put ("prioritized", prioritized);
return info;
}

Expand Down
15 changes: 9 additions & 6 deletions nano/node/repcrawler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class rep_crawler final
bool process (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &);

/** Attempt to determine if the peer manages one or more representative accounts */
void query (std::vector<std::shared_ptr<nano::transport::channel>> const & target_channels);
void query (std::deque<std::shared_ptr<nano::transport::channel>> const & target_channels);

/** Attempt to determine if the peer manages one or more representative accounts */
void query (std::shared_ptr<nano::transport::channel> const & target_channel);
Expand All @@ -75,12 +75,10 @@ class rep_crawler final
/** Get total available weight from representatives */
nano::uint128_t total_weight () const;

/** Request a list of the top \p count known representatives in descending order of weight, with at least \p weight_a voting weight, and optionally with a minimum version \p minimum_protocol_version
*/
/** Request a list of the top \p count known representatives in descending order of weight, with at least \p weight_a voting weight, and optionally with a minimum version \p minimum_protocol_version */
std::vector<representative> representatives (std::size_t count = std::numeric_limits<std::size_t>::max (), nano::uint128_t minimum_weight = 0, std::optional<decltype (nano::network_constants::protocol_version)> const & minimum_protocol_version = {}) const;

/** Request a list of the top \p count known principal representatives in descending order of weight, optionally with a minimum version \p minimum_protocol_version
*/
/** Request a list of the top \p count known principal representatives in descending order of weight, optionally with a minimum version \p minimum_protocol_version */
std::vector<representative> principal_representatives (std::size_t count = std::numeric_limits<std::size_t>::max (), std::optional<decltype (nano::network_constants::protocol_version)> const & minimum_protocol_version = {}) const;

/** Total number of representatives */
Expand All @@ -106,7 +104,8 @@ class rep_crawler final
using hash_root_t = std::pair<nano::block_hash, nano::root>;

/** Returns a list of endpoints to crawl. The total weight is passed in to avoid computing it twice. */
std::vector<std::shared_ptr<nano::transport::channel>> prepare_crawl_targets (bool sufficient_weight) const;

std::deque<std::shared_ptr<nano::transport::channel>> prepare_crawl_targets (bool sufficient_weight) const;
std::optional<hash_root_t> prepare_query_target () const;
bool track_rep_request (hash_root_t hash_root, std::shared_ptr<nano::transport::channel> const & channel);

Expand Down Expand Up @@ -173,9 +172,13 @@ class rep_crawler final

private:
static size_t constexpr max_responses{ 1024 * 4 };

using response_t = std::pair<std::shared_ptr<nano::transport::channel>, std::shared_ptr<nano::vote>>;
boost::circular_buffer<response_t> responses{ max_responses };

// Freshly established connections that should be queried asap
std::deque<std::shared_ptr<nano::transport::channel>> prioritized;

std::chrono::steady_clock::time_point last_query{};

std::atomic<bool> stopped{ false };
Expand Down

0 comments on commit 9de0e0e

Please sign in to comment.