Skip to content

Commit

Permalink
refactor: distinguish query types that validate
Browse files Browse the repository at this point in the history
FindNode queries don't need to validate the result, so we shouldn't look
for a validating stage, or handle pending results.

Split queries into Query and ValidatingQuery so that we can handle only
the relevant options in the enum.

Importantly, we shrink the job of the final Query::Result, and add a new
ValidatingQuery::PendingResult which is used to indicate some content
that is waiting to be validated.

Unfortunately, this comes at the cost of code duplication in
content_query_event_poll, which looks a lot like query_event_poll but
only handles ValidatingQuery types, so it can get access to the
pending_result.
  • Loading branch information
carver committed Sep 14, 2024
1 parent e458dee commit 3ea0dbf
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 58 deletions.
31 changes: 22 additions & 9 deletions portalnet/src/find/iterators/findcontent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tracing::warn;

use super::{
super::query_pool::QueryState,
query::{Query, QueryConfig, QueryPeer, QueryPeerState, QueryProgress},
query::{Query, QueryConfig, QueryPeer, QueryPeerState, QueryProgress, ValidatingQuery},
};

pub enum FindContentQueryResponse<TNodeId> {
Expand All @@ -44,9 +44,11 @@ pub enum FindContentQueryResponse<TNodeId> {
ConnectionId(u16),
}

/// An intermediate value, waiting to be validated. After validation, the content will become a
/// FindContentQueryResult.
#[derive(Debug)]
pub enum FindContentQueryResult<TNodeId> {
NoneFound,
pub enum FindContentQueryPending<TNodeId> {
NonePending,
/// Content returned, but not yet validated.
PendingContent {
content: Vec<u8>,
Expand All @@ -64,6 +66,11 @@ pub enum FindContentQueryResult<TNodeId> {
// channel used to send back the validated content info
valid_content_tx: Sender<ValidatedContent<TNodeId>>,
},
}

#[derive(Debug)]
pub enum FindContentQueryResult<TNodeId> {
NoneFound,
/// Content returned, but not yet validated. Also includes a list of peers that were cancelled
ValidContent(ValidatedContent<TNodeId>, Vec<TNodeId>),
}
Expand Down Expand Up @@ -351,16 +358,23 @@ where
None => FindContentQueryResult::NoneFound,
}
}
}

impl<TNodeId> ValidatingQuery<TNodeId> for FindContentQuery<TNodeId>
where
TNodeId: Into<Key<TNodeId>> + Eq + Clone + std::fmt::Display + std::hash::Hash,
{
type PendingResult = FindContentQueryPending<TNodeId>;

fn pending_result(&self, source: TNodeId) -> Self::Result {
fn pending_result(&self, source: TNodeId) -> Self::PendingResult {
match self.unchecked_content.get(&source) {
Some(UnvalidatedContent::Content(content)) => FindContentQueryResult::PendingContent {
Some(UnvalidatedContent::Content(content)) => FindContentQueryPending::PendingContent {
content: content.clone(),
nodes_to_poke: self.get_nodes_to_poke(&source),
peer: source,
valid_content_tx: self.content_tx.clone(),
},
Some(UnvalidatedContent::Connection(connection_id)) => FindContentQueryResult::Utp {
Some(UnvalidatedContent::Connection(connection_id)) => FindContentQueryPending::Utp {
connection_id: *connection_id,
nodes_to_poke: self.get_nodes_to_poke(&source),
peer: source,
Expand All @@ -371,7 +385,7 @@ where
// not be called unless the caller believes there to be content
// waiting to be validated.
warn!("pending_result called, but no content is available for {source}");
FindContentQueryResult::NoneFound
FindContentQueryPending::NonePending
}
}
}
Expand Down Expand Up @@ -616,7 +630,7 @@ mod tests {
}
// Immediately validate the content
match query.pending_result(*peer_node_id) {
FindContentQueryResult::PendingContent {
FindContentQueryPending::PendingContent {
content,
nodes_to_poke: _,
peer,
Expand Down Expand Up @@ -684,7 +698,6 @@ mod tests {
"Not all peers have been contacted: {uncontacted:?}"
);
}
_ => panic!("Unexpected result."),
}
}

Expand Down
7 changes: 0 additions & 7 deletions portalnet/src/find/iterators/findnodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,6 @@ where
.take(self.config.num_results)
.collect()
}

fn pending_result(&self, _source: TNodeId) -> Self::Result {
// FindNodes queries don't need to provide access to the result until the query is finished.
// This is because the query doesn't run content validation, so as soon as the nodes are
// returned, the query is considered finished.
unimplemented!("Use `into_result` instead. FindNodes does not run content validation.")
}
}

impl<TNodeId> FindNodeQuery<TNodeId>
Expand Down
10 changes: 9 additions & 1 deletion portalnet/src/find/iterators/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,18 @@ pub trait Query<TNodeId> {

/// Consumes the query, returning the result.
fn into_result(self) -> Self::Result;
}

/// This trait extends the `Query` trait with additional methods for validation.
///
/// It provides access to the unverified content that is pending validation.
pub trait ValidatingQuery<TNodeId> {
/// The type of a pending result that is waiting for validation.
type PendingResult;

/// Returns a result that is waiting for validation. The attached peer is used to look up which
/// pending result to return.
fn pending_result(&self, sending_peer: TNodeId) -> Self::Result;
fn pending_result(&self, sending_peer: TNodeId) -> Self::PendingResult;
}

/// Stage of the query.
Expand Down
117 changes: 76 additions & 41 deletions portalnet/src/overlay/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ use crate::{
find::{
iterators::{
findcontent::{
FindContentQuery, FindContentQueryResponse, FindContentQueryResult,
ValidatedContent,
FindContentQuery, FindContentQueryPending, FindContentQueryResponse,
FindContentQueryResult, ValidatedContent,
},
findnodes::FindNodeQuery,
query::{Query, QueryConfig},
query::{Query, QueryConfig, ValidatingQuery},
},
query_info::{QueryInfo, QueryType, RecursiveFindContentResult},
query_pool::{QueryId, QueryPool, QueryPoolState, TargetKey},
Expand Down Expand Up @@ -399,7 +399,7 @@ where
self.handle_find_nodes_query_event(query_event);
}
// Handle query events for queries in the find content query pool.
query_event = OverlayService::<TContentKey, TMetric, TValidator, TStore>::query_event_poll(self.find_content_query_pool.clone()) => {
query_event = OverlayService::<TContentKey, TMetric, TValidator, TStore>::content_query_event_poll(self.find_content_query_pool.clone()) => {
self.handle_find_content_query_event(query_event);
}
_ = OverlayService::<TContentKey, TMetric, TValidator, TStore>::bucket_maintenance_poll(self.protocol, &self.kbuckets) => {}
Expand Down Expand Up @@ -487,9 +487,44 @@ where
async fn query_event_poll<TQuery: Query<NodeId>>(
queries: Arc<RwLock<QueryPool<NodeId, TQuery, TContentKey>>>,
) -> QueryEvent<TQuery, TContentKey> {
future::poll_fn(move |_cx| match queries.write().poll() {
QueryPoolState::Validating(..) => {
unimplemented!("A FindNode query unexpectedly tried to validate content");
}
// Note that some query pools skip over Validating, straight to Finished (like the
// FindNode query pool)
QueryPoolState::Finished(query_id, query_info, query) => {
Poll::Ready(QueryEvent::Finished(query_id, query_info, query))
}
QueryPoolState::Timeout(query_id, query_info, query) => {
warn!(query.id = %query_id, "Query timed out");
Poll::Ready(QueryEvent::TimedOut(query_id, query_info, query))
}
QueryPoolState::Waiting(Some((query_id, query_info, query, return_peer))) => {
let node_id = return_peer;

let request_body = match query_info.rpc_request(return_peer) {
Ok(request_body) => request_body,
Err(_) => {
query.on_failure(&node_id);
return Poll::Pending;
}
};

Poll::Ready(QueryEvent::Waiting(query_id, node_id, request_body))
}

QueryPoolState::Waiting(None) | QueryPoolState::Idle => Poll::Pending,
})
.await
}

/// Generates a `ValidatingQueryEvent` from a poll on the find content query pool.
async fn content_query_event_poll<TQuery: Query<NodeId> + ValidatingQuery<NodeId>>(
queries: Arc<RwLock<QueryPool<NodeId, TQuery, TContentKey>>>,
) -> ValidatingQueryEvent<TQuery, TContentKey> {
future::poll_fn(move |_cx| match queries.write().poll() {
QueryPoolState::Validating(query_info, query, sending_peer) => {
// This only happens during a FindContent query.
let content_key = match &query_info.query_type {
QueryType::FindContent { target, .. } => target.clone(),
_ => {
Expand All @@ -503,15 +538,15 @@ where
// Generate the query result here instead of in handle_find_content_query_event,
// because the query is only borrowed, and can't be moved to the query event.
let query_result = query.pending_result(sending_peer);
Poll::Ready(QueryEvent::Validating(content_key, query_result))
Poll::Ready(ValidatingQueryEvent::Validating(content_key, query_result))
}
// Note that some query pools skip over Validating, straight to Finished (like the FindNode query pool)
QueryPoolState::Finished(query_id, query_info, query) => {
Poll::Ready(QueryEvent::Finished(query_id, query_info, query))
Poll::Ready(ValidatingQueryEvent::Finished(query_id, query_info, query))
}
QueryPoolState::Timeout(query_id, query_info, query) => {
warn!(query.id = %query_id, "Query timed out");
Poll::Ready(QueryEvent::TimedOut(query_id, query_info, query))
warn!(query.id = %query_id, "Content query timed out");
Poll::Ready(ValidatingQueryEvent::TimedOut(query_id, query_info, query))
}
QueryPoolState::Waiting(Some((query_id, query_info, query, return_peer))) => {
let node_id = return_peer;
Expand All @@ -524,7 +559,7 @@ where
}
};

Poll::Ready(QueryEvent::Waiting(query_id, node_id, request_body))
Poll::Ready(ValidatingQueryEvent::Waiting(query_id, node_id, request_body))
}

QueryPoolState::Waiting(None) | QueryPoolState::Idle => Poll::Pending,
Expand Down Expand Up @@ -562,10 +597,6 @@ where
}
}
}
QueryEvent::Validating(..) => {
// This should be an unreachable path
unimplemented!("A FindNode query unexpectedly tried to validate content");
}
// Query has ended.
QueryEvent::Finished(query_id, mut query_info, query)
| QueryEvent::TimedOut(query_id, mut query_info, query) => {
Expand Down Expand Up @@ -616,10 +647,10 @@ where
/// Handles a `QueryEvent` from a poll on the find content query pool.
fn handle_find_content_query_event(
&mut self,
query_event: QueryEvent<FindContentQuery<NodeId>, TContentKey>,
query_event: ValidatingQueryEvent<FindContentQuery<NodeId>, TContentKey>,
) {
match query_event {
QueryEvent::Waiting(query_id, node_id, request) => {
ValidatingQueryEvent::Waiting(query_id, node_id, request) => {
if let Some(enr) = self.find_enr(&node_id) {
// If we find the node's ENR, then send the request on behalf of the
// query. No callback channel is necessary for the request, because the
Expand All @@ -646,16 +677,13 @@ where
}
}
}
QueryEvent::Validating(content_key, query_result) => {
ValidatingQueryEvent::Validating(content_key, query_result) => {
match query_result {
FindContentQueryResult::NoneFound => {
FindContentQueryPending::NonePending => {
// This should be an unreachable path
error!("A FindContent query claimed to have some new data to validate, but none was available");
}
FindContentQueryResult::ValidContent(..) => {
panic!("A FindContent query's pending_result() must never return already validated content");
}
FindContentQueryResult::PendingContent {
FindContentQueryPending::PendingContent {
content,
nodes_to_poke,
peer,
Expand All @@ -675,7 +703,7 @@ where
.await;
});
}
FindContentQueryResult::Utp {
FindContentQueryPending::Utp {
connection_id,
peer,
nodes_to_poke,
Expand Down Expand Up @@ -723,8 +751,8 @@ where
}
};
}
QueryEvent::Finished(_, query_info, query)
| QueryEvent::TimedOut(_, query_info, query) => {
ValidatingQueryEvent::Finished(_, query_info, query)
| ValidatingQueryEvent::TimedOut(_, query_info, query) => {
let callback = match query_info.query_type {
QueryType::FindContent { callback, .. } => callback,
_ => {
Expand Down Expand Up @@ -772,13 +800,6 @@ where
}));
}
}
FindContentQueryResult::PendingContent { .. }
| FindContentQueryResult::Utp { .. } => {
// If no content gets verified, then the query must return NoneFound
unimplemented!(
"Unverified content should never be returned by the final query result"
);
}
}
}
}
Expand Down Expand Up @@ -2629,12 +2650,23 @@ where
/// The result of the `query_event_poll` indicating an action is required to further progress an
/// active query.
pub enum QueryEvent<TQuery: Query<NodeId>, TContentKey> {
/// The query is waiting for a peer to be contacted.
Waiting(QueryId, NodeId, Request),
/// The query has timed out, possible returning peers.
TimedOut(QueryId, QueryInfo<TContentKey>, TQuery),
/// The query has completed successfully.
Finished(QueryId, QueryInfo<TContentKey>, TQuery),
}

/// The result of the `query_event_poll` indicating an action is required to further progress an
/// active query.
pub enum ValidatingQueryEvent<TQuery: Query<NodeId> + ValidatingQuery<NodeId>, TContentKey> {
/// The query is waiting for a peer to be contacted.
Waiting(QueryId, NodeId, Request),
/// The query has timed out, possible returning peers.
TimedOut(QueryId, QueryInfo<TContentKey>, TQuery),
/// The query is working to validate the response.
Validating(TContentKey, TQuery::Result),
Validating(TContentKey, TQuery::PendingResult),
/// The query has completed successfully.
Finished(QueryId, QueryInfo<TContentKey>, TQuery),
}
Expand Down Expand Up @@ -3927,7 +3959,7 @@ mod tests {

// Query result should contain content.
match query.pending_result(bootnode_node_id) {
FindContentQueryResult::PendingContent {
FindContentQueryPending::PendingContent {
content: pending_content,
peer,
..
Expand Down Expand Up @@ -3997,7 +4029,7 @@ mod tests {

// Query result should contain connection ID
match query.pending_result(bootnode_node_id) {
FindContentQueryResult::Utp {
FindContentQueryPending::Utp {
connection_id,
peer,
..
Expand Down Expand Up @@ -4043,13 +4075,16 @@ mod tests {
let query_id = query_id.expect("Query ID for new find content query is `None`");

let query_event =
OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::query_event_poll(
OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::content_query_event_poll(
service.find_content_query_pool.clone(),
)
.await;

// Query event should be `Waiting` variant.
assert!(matches!(query_event, QueryEvent::Waiting(_, _, _)));
assert!(matches!(
query_event,
ValidatingQueryEvent::Waiting(_, _, _)
));

service.handle_find_content_query_event(query_event);

Expand Down Expand Up @@ -4082,13 +4117,13 @@ mod tests {
);

let query_event =
OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::query_event_poll(
OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::content_query_event_poll(
service.find_content_query_pool.clone(),
)
.await;

// Query event should be `Validating` variant.
assert!(matches!(query_event, QueryEvent::Validating(..)));
assert!(matches!(query_event, ValidatingQueryEvent::Validating(..)));

service.handle_find_content_query_event(query_event);

Expand All @@ -4097,7 +4132,7 @@ mod tests {
let query_event = loop {
let polled = timeout(
Duration::from_millis(10),
OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::query_event_poll(
OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::content_query_event_poll(
service.find_content_query_pool.clone(),
),
)
Expand All @@ -4112,7 +4147,7 @@ mod tests {
};

// Query event should be `Finished` variant.
assert!(matches!(query_event, QueryEvent::Finished(..)));
assert!(matches!(query_event, ValidatingQueryEvent::Finished(..)));

service.handle_find_content_query_event(query_event);

Expand Down

0 comments on commit 3ea0dbf

Please sign in to comment.