diff --git a/portalnet/src/find/iterators/findcontent.rs b/portalnet/src/find/iterators/findcontent.rs index 0f1bef6ec..3608dd937 100644 --- a/portalnet/src/find/iterators/findcontent.rs +++ b/portalnet/src/find/iterators/findcontent.rs @@ -35,7 +35,7 @@ use tracing::warn; use super::{ super::query_pool::QueryState, - query::{Query, QueryConfig, QueryPeer, QueryPeerState, QueryProgress}, + query::{Query, QueryConfig, QueryPeer, QueryPeerState, QueryProgress, ValidatingQuery}, }; pub enum FindContentQueryResponse { @@ -44,9 +44,11 @@ pub enum FindContentQueryResponse { ConnectionId(u16), } +/// An intermediate value, waiting to be validated. After validation, the content will become a +/// FindContentQueryResult. #[derive(Debug)] -pub enum FindContentQueryResult { - NoneFound, +pub enum FindContentQueryPending { + NonePending, /// Content returned, but not yet validated. PendingContent { content: Vec, @@ -64,6 +66,11 @@ pub enum FindContentQueryResult { // channel used to send back the validated content info valid_content_tx: Sender>, }, +} + +#[derive(Debug)] +pub enum FindContentQueryResult { + NoneFound, /// Content returned, but not yet validated. Also includes a list of peers that were cancelled ValidContent(ValidatedContent, Vec), } @@ -351,16 +358,23 @@ where None => FindContentQueryResult::NoneFound, } } +} + +impl ValidatingQuery for FindContentQuery +where + TNodeId: Into> + Eq + Clone + std::fmt::Display + std::hash::Hash, +{ + type PendingResult = FindContentQueryPending; - fn pending_result(&self, source: TNodeId) -> Self::Result { + fn pending_result(&self, source: TNodeId) -> Self::PendingResult { match self.unchecked_content.get(&source) { - Some(UnvalidatedContent::Content(content)) => FindContentQueryResult::PendingContent { + Some(UnvalidatedContent::Content(content)) => FindContentQueryPending::PendingContent { content: content.clone(), nodes_to_poke: self.get_nodes_to_poke(&source), peer: source, valid_content_tx: self.content_tx.clone(), }, - Some(UnvalidatedContent::Connection(connection_id)) => FindContentQueryResult::Utp { + Some(UnvalidatedContent::Connection(connection_id)) => FindContentQueryPending::Utp { connection_id: *connection_id, nodes_to_poke: self.get_nodes_to_poke(&source), peer: source, @@ -371,7 +385,7 @@ where // not be called unless the caller believes there to be content // waiting to be validated. warn!("pending_result called, but no content is available for {source}"); - FindContentQueryResult::NoneFound + FindContentQueryPending::NonePending } } } @@ -616,7 +630,7 @@ mod tests { } // Immediately validate the content match query.pending_result(*peer_node_id) { - FindContentQueryResult::PendingContent { + FindContentQueryPending::PendingContent { content, nodes_to_poke: _, peer, @@ -684,7 +698,6 @@ mod tests { "Not all peers have been contacted: {uncontacted:?}" ); } - _ => panic!("Unexpected result."), } } diff --git a/portalnet/src/find/iterators/findnodes.rs b/portalnet/src/find/iterators/findnodes.rs index 11c2765ed..e87b13df8 100644 --- a/portalnet/src/find/iterators/findnodes.rs +++ b/portalnet/src/find/iterators/findnodes.rs @@ -264,13 +264,6 @@ where .take(self.config.num_results) .collect() } - - fn pending_result(&self, _source: TNodeId) -> Self::Result { - // FindNodes queries don't need to provide access to the result until the query is finished. - // This is because the query doesn't run content validation, so as soon as the nodes are - // returned, the query is considered finished. - unimplemented!("Use `into_result` instead. FindNodes does not run content validation.") - } } impl FindNodeQuery diff --git a/portalnet/src/find/iterators/query.rs b/portalnet/src/find/iterators/query.rs index 8b49cdb9d..d13614834 100644 --- a/portalnet/src/find/iterators/query.rs +++ b/portalnet/src/find/iterators/query.rs @@ -110,10 +110,18 @@ pub trait Query { /// Consumes the query, returning the result. fn into_result(self) -> Self::Result; +} + +/// This trait extends the `Query` trait with additional methods for validation. +/// +/// It provides access to the unverified content that is pending validation. +pub trait ValidatingQuery { + /// The type of a pending result that is waiting for validation. + type PendingResult; /// Returns a result that is waiting for validation. The attached peer is used to look up which /// pending result to return. - fn pending_result(&self, sending_peer: TNodeId) -> Self::Result; + fn pending_result(&self, sending_peer: TNodeId) -> Self::PendingResult; } /// Stage of the query. diff --git a/portalnet/src/overlay/service.rs b/portalnet/src/overlay/service.rs index 3496676bd..963796454 100644 --- a/portalnet/src/overlay/service.rs +++ b/portalnet/src/overlay/service.rs @@ -43,11 +43,11 @@ use crate::{ find::{ iterators::{ findcontent::{ - FindContentQuery, FindContentQueryResponse, FindContentQueryResult, - ValidatedContent, + FindContentQuery, FindContentQueryPending, FindContentQueryResponse, + FindContentQueryResult, ValidatedContent, }, findnodes::FindNodeQuery, - query::{Query, QueryConfig}, + query::{Query, QueryConfig, ValidatingQuery}, }, query_info::{QueryInfo, QueryType, RecursiveFindContentResult}, query_pool::{QueryId, QueryPool, QueryPoolState, TargetKey}, @@ -399,7 +399,7 @@ where self.handle_find_nodes_query_event(query_event); } // Handle query events for queries in the find content query pool. - query_event = OverlayService::::query_event_poll(self.find_content_query_pool.clone()) => { + query_event = OverlayService::::content_query_event_poll(self.find_content_query_pool.clone()) => { self.handle_find_content_query_event(query_event); } _ = OverlayService::::bucket_maintenance_poll(self.protocol, &self.kbuckets) => {} @@ -487,9 +487,44 @@ where async fn query_event_poll>( queries: Arc>>, ) -> QueryEvent { + future::poll_fn(move |_cx| match queries.write().poll() { + QueryPoolState::Validating(..) => { + unimplemented!("A FindNode query unexpectedly tried to validate content"); + } + // Note that some query pools skip over Validating, straight to Finished (like the + // FindNode query pool) + QueryPoolState::Finished(query_id, query_info, query) => { + Poll::Ready(QueryEvent::Finished(query_id, query_info, query)) + } + QueryPoolState::Timeout(query_id, query_info, query) => { + warn!(query.id = %query_id, "Query timed out"); + Poll::Ready(QueryEvent::TimedOut(query_id, query_info, query)) + } + QueryPoolState::Waiting(Some((query_id, query_info, query, return_peer))) => { + let node_id = return_peer; + + let request_body = match query_info.rpc_request(return_peer) { + Ok(request_body) => request_body, + Err(_) => { + query.on_failure(&node_id); + return Poll::Pending; + } + }; + + Poll::Ready(QueryEvent::Waiting(query_id, node_id, request_body)) + } + + QueryPoolState::Waiting(None) | QueryPoolState::Idle => Poll::Pending, + }) + .await + } + + /// Generates a `ValidatingQueryEvent` from a poll on the find content query pool. + async fn content_query_event_poll + ValidatingQuery>( + queries: Arc>>, + ) -> ValidatingQueryEvent { future::poll_fn(move |_cx| match queries.write().poll() { QueryPoolState::Validating(query_info, query, sending_peer) => { - // This only happens during a FindContent query. let content_key = match &query_info.query_type { QueryType::FindContent { target, .. } => target.clone(), _ => { @@ -503,15 +538,15 @@ where // Generate the query result here instead of in handle_find_content_query_event, // because the query is only borrowed, and can't be moved to the query event. let query_result = query.pending_result(sending_peer); - Poll::Ready(QueryEvent::Validating(content_key, query_result)) + Poll::Ready(ValidatingQueryEvent::Validating(content_key, query_result)) } // Note that some query pools skip over Validating, straight to Finished (like the FindNode query pool) QueryPoolState::Finished(query_id, query_info, query) => { - Poll::Ready(QueryEvent::Finished(query_id, query_info, query)) + Poll::Ready(ValidatingQueryEvent::Finished(query_id, query_info, query)) } QueryPoolState::Timeout(query_id, query_info, query) => { - warn!(query.id = %query_id, "Query timed out"); - Poll::Ready(QueryEvent::TimedOut(query_id, query_info, query)) + warn!(query.id = %query_id, "Content query timed out"); + Poll::Ready(ValidatingQueryEvent::TimedOut(query_id, query_info, query)) } QueryPoolState::Waiting(Some((query_id, query_info, query, return_peer))) => { let node_id = return_peer; @@ -524,7 +559,7 @@ where } }; - Poll::Ready(QueryEvent::Waiting(query_id, node_id, request_body)) + Poll::Ready(ValidatingQueryEvent::Waiting(query_id, node_id, request_body)) } QueryPoolState::Waiting(None) | QueryPoolState::Idle => Poll::Pending, @@ -562,10 +597,6 @@ where } } } - QueryEvent::Validating(..) => { - // This should be an unreachable path - unimplemented!("A FindNode query unexpectedly tried to validate content"); - } // Query has ended. QueryEvent::Finished(query_id, mut query_info, query) | QueryEvent::TimedOut(query_id, mut query_info, query) => { @@ -616,10 +647,10 @@ where /// Handles a `QueryEvent` from a poll on the find content query pool. fn handle_find_content_query_event( &mut self, - query_event: QueryEvent, TContentKey>, + query_event: ValidatingQueryEvent, TContentKey>, ) { match query_event { - QueryEvent::Waiting(query_id, node_id, request) => { + ValidatingQueryEvent::Waiting(query_id, node_id, request) => { if let Some(enr) = self.find_enr(&node_id) { // If we find the node's ENR, then send the request on behalf of the // query. No callback channel is necessary for the request, because the @@ -646,16 +677,13 @@ where } } } - QueryEvent::Validating(content_key, query_result) => { + ValidatingQueryEvent::Validating(content_key, query_result) => { match query_result { - FindContentQueryResult::NoneFound => { + FindContentQueryPending::NonePending => { // This should be an unreachable path error!("A FindContent query claimed to have some new data to validate, but none was available"); } - FindContentQueryResult::ValidContent(..) => { - panic!("A FindContent query's pending_result() must never return already validated content"); - } - FindContentQueryResult::PendingContent { + FindContentQueryPending::PendingContent { content, nodes_to_poke, peer, @@ -675,7 +703,7 @@ where .await; }); } - FindContentQueryResult::Utp { + FindContentQueryPending::Utp { connection_id, peer, nodes_to_poke, @@ -723,8 +751,8 @@ where } }; } - QueryEvent::Finished(_, query_info, query) - | QueryEvent::TimedOut(_, query_info, query) => { + ValidatingQueryEvent::Finished(_, query_info, query) + | ValidatingQueryEvent::TimedOut(_, query_info, query) => { let callback = match query_info.query_type { QueryType::FindContent { callback, .. } => callback, _ => { @@ -772,13 +800,6 @@ where })); } } - FindContentQueryResult::PendingContent { .. } - | FindContentQueryResult::Utp { .. } => { - // If no content gets verified, then the query must return NoneFound - unimplemented!( - "Unverified content should never be returned by the final query result" - ); - } } } } @@ -2629,12 +2650,23 @@ where /// The result of the `query_event_poll` indicating an action is required to further progress an /// active query. pub enum QueryEvent, TContentKey> { + /// The query is waiting for a peer to be contacted. + Waiting(QueryId, NodeId, Request), + /// The query has timed out, possible returning peers. + TimedOut(QueryId, QueryInfo, TQuery), + /// The query has completed successfully. + Finished(QueryId, QueryInfo, TQuery), +} + +/// The result of the `query_event_poll` indicating an action is required to further progress an +/// active query. +pub enum ValidatingQueryEvent + ValidatingQuery, TContentKey> { /// The query is waiting for a peer to be contacted. Waiting(QueryId, NodeId, Request), /// The query has timed out, possible returning peers. TimedOut(QueryId, QueryInfo, TQuery), /// The query is working to validate the response. - Validating(TContentKey, TQuery::Result), + Validating(TContentKey, TQuery::PendingResult), /// The query has completed successfully. Finished(QueryId, QueryInfo, TQuery), } @@ -3927,7 +3959,7 @@ mod tests { // Query result should contain content. match query.pending_result(bootnode_node_id) { - FindContentQueryResult::PendingContent { + FindContentQueryPending::PendingContent { content: pending_content, peer, .. @@ -3997,7 +4029,7 @@ mod tests { // Query result should contain connection ID match query.pending_result(bootnode_node_id) { - FindContentQueryResult::Utp { + FindContentQueryPending::Utp { connection_id, peer, .. @@ -4043,13 +4075,16 @@ mod tests { let query_id = query_id.expect("Query ID for new find content query is `None`"); let query_event = - OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::query_event_poll( + OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::content_query_event_poll( service.find_content_query_pool.clone(), ) .await; // Query event should be `Waiting` variant. - assert!(matches!(query_event, QueryEvent::Waiting(_, _, _))); + assert!(matches!( + query_event, + ValidatingQueryEvent::Waiting(_, _, _) + )); service.handle_find_content_query_event(query_event); @@ -4082,13 +4117,13 @@ mod tests { ); let query_event = - OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::query_event_poll( + OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::content_query_event_poll( service.find_content_query_pool.clone(), ) .await; // Query event should be `Validating` variant. - assert!(matches!(query_event, QueryEvent::Validating(..))); + assert!(matches!(query_event, ValidatingQueryEvent::Validating(..))); service.handle_find_content_query_event(query_event); @@ -4097,7 +4132,7 @@ mod tests { let query_event = loop { let polled = timeout( Duration::from_millis(10), - OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::query_event_poll( + OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::content_query_event_poll( service.find_content_query_pool.clone(), ), ) @@ -4112,7 +4147,7 @@ mod tests { }; // Query event should be `Finished` variant. - assert!(matches!(query_event, QueryEvent::Finished(..))); + assert!(matches!(query_event, ValidatingQueryEvent::Finished(..))); service.handle_find_content_query_event(query_event);