Skip to content

Commit

Permalink
Merge pull request #1447 from carver/natural-connection-id
Browse files Browse the repository at this point in the history
Credit first peer to return content
  • Loading branch information
carver authored Sep 13, 2024
2 parents 56cb29c + 65ef35e commit 4d17461
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 48 deletions.
2 changes: 1 addition & 1 deletion ethportal-peertest/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ fn read_history_content_key_value(
/// Wrapper function for fixtures that directly returns the tuple.
fn read_fixture(file_name: &str) -> (HistoryContentKey, HistoryContentValue) {
read_history_content_key_value(file_name)
.unwrap_or_else(|err| panic!("Error reading fixture: {err}"))
.unwrap_or_else(|err| panic!("Error reading fixture in {file_name}: {err}"))
}

/// History HeaderWithProof content key & value
Expand Down
6 changes: 1 addition & 5 deletions light-client/src/consensus/consensus_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,11 +523,7 @@ impl<R: ConsensusRpc> ConsensusLightClient<R> {
))
})();

if let Ok(is_valid) = res {
is_valid
} else {
false
}
res.unwrap_or_default()
}

fn compute_committee_sign_root(&self, header: Bytes32, slot: u64) -> Result<Node> {
Expand Down
6 changes: 1 addition & 5 deletions light-client/src/consensus/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ pub fn is_proof_valid<L: TreeHash>(
Ok(is_valid)
})();

if let Ok(is_valid) = res {
is_valid
} else {
false
}
res.unwrap_or_default()
}

#[derive(SimpleSerialize, Default, Debug)]
Expand Down
85 changes: 51 additions & 34 deletions portalnet/src/find/iterators/findcontent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ struct UtpAndPeerDetails<TNodeId> {
}

#[derive(Debug, Clone)]
pub struct FindContentQuery<TNodeId> {
pub struct FindContentQuery<TNodeId: std::fmt::Display> {
/// The target key we are looking for
target_key: Key<TNodeId>,

Expand All @@ -99,7 +99,7 @@ pub struct FindContentQuery<TNodeId> {

impl<TNodeId> Query<TNodeId> for FindContentQuery<TNodeId>
where
TNodeId: Into<Key<TNodeId>> + Eq + Clone,
TNodeId: Into<Key<TNodeId>> + Eq + Clone + std::fmt::Display,
{
type Response = FindContentQueryResponse<TNodeId>;
type Result = FindContentQueryResult<TNodeId>;
Expand Down Expand Up @@ -189,16 +189,20 @@ where
};
}
FindContentQueryResponse::Content(content) => {
self.content = Some(ContentAndPeer::Content(ContentAndPeerDetails {
content,
peer: peer.clone(),
}));
if self.content.is_none() {
self.content = Some(ContentAndPeer::Content(ContentAndPeerDetails {
content,
peer: peer.clone(),
}));
}
}
FindContentQueryResponse::ConnectionId(connection_id) => {
self.content = Some(ContentAndPeer::Utp(UtpAndPeerDetails {
connection_id: u16::from_be(connection_id),
peer: peer.clone(),
}));
if self.content.is_none() {
self.content = Some(ContentAndPeer::Utp(UtpAndPeerDetails {
connection_id: u16::from_be(connection_id),
peer: peer.clone(),
}));
}
}
}
}
Expand Down Expand Up @@ -361,7 +365,7 @@ where

impl<TNodeId> FindContentQuery<TNodeId>
where
TNodeId: Into<Key<TNodeId>> + Eq + Clone,
TNodeId: Into<Key<TNodeId>> + Eq + Clone + std::fmt::Display,
{
/// Creates a new query with the given configuration.
pub fn with_config<I>(
Expand Down Expand Up @@ -456,8 +460,9 @@ mod tests {
use discv5::enr::NodeId;
use quickcheck::*;
use rand::{thread_rng, Rng};
use std::time::Duration;
use std::{cmp::min, time::Duration};
use test_log::test;
use tracing::debug;

type TestQuery = FindContentQuery<NodeId>;

Expand Down Expand Up @@ -525,42 +530,44 @@ mod tests {
}
}

#[test]
#[test_log::test]
fn termination_and_parallelism() {
fn prop(mut query: TestQuery) {
let now = Instant::now();
let mut rng = thread_rng();

let mut expected = query
let mut remaining = query
.closest_peers
.values()
.map(|e| e.key().clone())
.collect::<Vec<_>>();
let num_known = expected.len();
let num_known = remaining.len();
let max_parallelism = usize::min(query.config.parallelism, num_known);

let target = query.target_key.clone();
let mut remaining;
let mut expected: Vec<_>;
let mut num_failures = 0;

let found_content: Vec<u8> = vec![0xef];
let mut content_peer = None;

'finished: loop {
if expected.is_empty() {
if remaining.is_empty() {
debug!("ending test: no more peers to pull from");
break;
}
// Split off the next up to `parallelism` expected peers.
else if expected.len() < max_parallelism {
remaining = Vec::new();
} else {
remaining = expected.split_off(max_parallelism);
// Split off the next (up to) `parallelism` peers, who we expect to poll.
let num_expected = min(max_parallelism, remaining.len());
expected = remaining.drain(..num_expected).collect();
}

// Advance the query for maximum parallelism.
for k in expected.iter() {
match query.poll(now) {
QueryState::Finished => break 'finished,
QueryState::Finished => {
debug!("Ending test loop: query state is finished");
break 'finished;
}
QueryState::Waiting(Some(p)) => assert_eq!(&p, k.preimage()),
QueryState::Waiting(None) => panic!("Expected another peer."),
QueryState::WaitingAtCapacity => panic!("Unexpectedly reached capacity."),
Expand All @@ -583,7 +590,11 @@ mod tests {
k.preimage(),
FindContentQueryResponse::Content(found_content.clone()),
);
content_peer = Some(k.clone());
// The first peer to return the content should be the one reported at
// the end.
if content_peer.is_none() {
content_peer = Some(k.clone());
}
} else {
let num_closer = rng.gen_range(0..query.config.num_results + 1);
let closer_peers = random_nodes(num_closer).collect::<Vec<_>>();
Expand All @@ -602,8 +613,6 @@ mod tests {

// Re-sort the remaining expected peers for the next "round".
remaining.sort_by_key(|k| target.distance(k));

expected = remaining;
}

// The query must be finished.
Expand All @@ -613,12 +622,16 @@ mod tests {
// Determine if all peers have been contacted by the query. This _must_ be
// the case if the query finished without content and with fewer than the
// requested number of results.
let all_contacted = query.closest_peers.values().all(|e| {
!matches!(
e.state(),
QueryPeerState::NotContacted | QueryPeerState::Waiting { .. }
)
});
let final_peers = query.closest_peers.clone();
let uncontacted: Vec<_> = final_peers
.values()
.filter(|e| {
matches!(
e.state(),
QueryPeerState::NotContacted | QueryPeerState::Waiting { .. }
)
})
.collect();

let target_key = query.target_key.clone();
let num_results = query.config.num_results;
Expand Down Expand Up @@ -652,7 +665,11 @@ mod tests {
// have been failures.
assert!(num_known < num_results || num_failures > 0);
// All peers must have been contacted.
assert!(all_contacted, "Not all peers have been contacted.");
assert_eq!(
uncontacted.len(),
0,
"Not all peers have been contacted: {uncontacted:?}"
);
} else {
assert_eq!(num_results, closest_nodes.len(), "Too many results.");
}
Expand All @@ -661,7 +678,7 @@ mod tests {
}
}

QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
QuickCheck::new().tests(100).quickcheck(prop as fn(_) -> _)
}

#[test]
Expand Down
6 changes: 3 additions & 3 deletions portalnet/src/overlay/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ where
let utp_processing = UtpProcessing::from(&*self);
tokio::spawn(async move {
Self::process_received_content(
content.clone(),
content,
false,
content_key,
callback,
Expand Down Expand Up @@ -971,9 +971,9 @@ where
Ok(Content::ConnectionId(cid_send.to_be()))
}
}
// If we don't have data to send back or can't obtain a permit, send the requester a
// If we can't obtain a permit or don't have data to send back, send the requester a
// list of closer ENRs.
(Ok(Some(_)), _) | (Ok(None), _) => {
(Ok(_), None) | (Ok(None), _) => {
let mut enrs = self.find_nodes_close_to_content(content_key);
enrs.retain(|enr| source != &enr.node_id());
pop_while_ssz_bytes_len_gt(&mut enrs, MAX_PORTAL_CONTENT_PAYLOAD_SIZE);
Expand Down

0 comments on commit 4d17461

Please sign in to comment.