diff --git a/substrate/client/network/src/event.rs b/substrate/client/network/src/event.rs index 6eea38d839f0..e8ec1eee2545 100644 --- a/substrate/client/network/src/event.rs +++ b/substrate/client/network/src/event.rs @@ -50,6 +50,12 @@ pub enum DhtEvent { /// The DHT received a put record request. PutRecordRequest(Key, Vec, Option, Option), + + /// The providers for [`Key`] were found. + ProvidersFound(Key, Vec), + + /// The providers for [`Key`] were not found. + ProvidersNotFound(Key), } /// Type for events generated by networking layer. diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index a0e17fadaf12..2bea2e5a80dc 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -32,7 +32,7 @@ use litep2p::{ libp2p::{ identify::{Config as IdentifyConfig, IdentifyEvent}, kademlia::{ - Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, + Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, ContentProvider, IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum, Record, RecordKey, RecordsType, }, @@ -144,6 +144,14 @@ pub enum DiscoveryEvent { query_id: QueryId, }, + /// Providers were successfully retrieved. + GetProvidersSuccess { + /// Query ID. + query_id: QueryId, + /// Found providers sorted by distance to provided key. + providers: Vec, + }, + /// Query failed. QueryFailed { /// Query ID. @@ -417,6 +425,11 @@ impl Discovery { self.kademlia_handle.stop_providing(key.into()).await; } + /// Get providers for `key`. + pub async fn get_providers(&mut self, key: KademliaKey) -> QueryId { + self.kademlia_handle.get_providers(key.into()).await + } + /// Check if the observed address is a known address. fn is_known_address(known: &Multiaddr, observed: &Multiaddr) -> bool { let mut known = known.iter(); @@ -591,8 +604,22 @@ impl Stream for Discovery { return Poll::Ready(Some(DiscoveryEvent::IncomingRecord { record })) }, - // Content provider events are ignored for now. - Poll::Ready(Some(KademliaEvent::GetProvidersSuccess { .. })) | + Poll::Ready(Some(KademliaEvent::GetProvidersSuccess { + provided_key, + providers, + query_id, + })) => { + log::trace!( + target: LOG_TARGET, + "`GET_PROVIDERS` for {query_id:?} with {provided_key:?} yielded {providers:?}", + ); + + return Poll::Ready(Some(DiscoveryEvent::GetProvidersSuccess { + query_id, + providers, + })) + }, + // We do not validate incoming providers. Poll::Ready(Some(KademliaEvent::IncomingProvider { .. })) => {}, } diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index 4d7902c96760..dac577445d7b 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -163,6 +163,9 @@ pub struct Litep2pNetworkBackend { /// Pending `PUT_VALUE` queries. pending_put_values: HashMap, + /// Pending `GET_PROVIDERS` queries. + pending_get_providers: HashMap, + /// Discovery. discovery: Discovery, @@ -617,6 +620,7 @@ impl NetworkBackend for Litep2pNetworkBac discovery, pending_put_values: HashMap::new(), pending_get_values: HashMap::new(), + pending_get_providers: HashMap::new(), peerstore_handle: peer_store_handle, block_announce_protocol, event_streams: out_events::OutChannels::new(None)?, @@ -724,6 +728,10 @@ impl NetworkBackend for Litep2pNetworkBac NetworkServiceCommand::StopProviding { key } => { self.discovery.stop_providing(key).await; } + NetworkServiceCommand::GetProviders { key } => { + let query_id = self.discovery.get_providers(key.clone()).await; + self.pending_get_providers.insert(query_id, (key, Instant::now())); + } NetworkServiceCommand::EventStream { tx } => { self.event_streams.push(tx); } @@ -881,50 +889,90 @@ impl NetworkBackend for Litep2pNetworkBac } } } - Some(DiscoveryEvent::QueryFailed { query_id }) => { - match self.pending_get_values.remove(&query_id) { - None => match self.pending_put_values.remove(&query_id) { - None => log::warn!( - target: LOG_TARGET, - "non-existent query failed ({query_id:?})", - ), - Some((key, started)) => { - log::debug!( - target: LOG_TARGET, - "`PUT_VALUE` ({query_id:?}) failed for key {key:?}", - ); - - self.event_streams.send(Event::Dht( - DhtEvent::ValuePutFailed(key) - )); - - if let Some(ref metrics) = self.metrics { - metrics - .kademlia_query_duration - .with_label_values(&["value-put-failed"]) - .observe(started.elapsed().as_secs_f64()); - } - } - } + Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => { + match self.pending_get_providers.remove(&query_id) { + None => log::warn!( + target: LOG_TARGET, + "`GET_PROVIDERS` succeeded for a non-existent query", + ), Some((key, started)) => { - log::debug!( + log::trace!( target: LOG_TARGET, - "`GET_VALUE` ({query_id:?}) failed for key {key:?}", + "`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded", ); self.event_streams.send(Event::Dht( - DhtEvent::ValueNotFound(key) + DhtEvent::ProvidersFound( + key.into(), + providers.into_iter().map(|p| p.peer.into()).collect() + ) )); if let Some(ref metrics) = self.metrics { metrics .kademlia_query_duration - .with_label_values(&["value-get-failed"]) + .with_label_values(&["providers-get"]) .observe(started.elapsed().as_secs_f64()); } } } } + Some(DiscoveryEvent::QueryFailed { query_id }) => { + if let Some((key, started)) = self.pending_get_values.remove(&query_id) { + log::debug!( + target: LOG_TARGET, + "`GET_VALUE` ({query_id:?}) failed for key {key:?}", + ); + + self.event_streams.send(Event::Dht( + DhtEvent::ValueNotFound(key) + )); + + if let Some(ref metrics) = self.metrics { + metrics + .kademlia_query_duration + .with_label_values(&["value-get-failed"]) + .observe(started.elapsed().as_secs_f64()); + } + } else if let Some((key, started)) = self.pending_put_values.remove(&query_id) { + log::debug!( + target: LOG_TARGET, + "`PUT_VALUE` ({query_id:?}) failed for key {key:?}", + ); + + self.event_streams.send(Event::Dht( + DhtEvent::ValuePutFailed(key) + )); + + if let Some(ref metrics) = self.metrics { + metrics + .kademlia_query_duration + .with_label_values(&["value-put-failed"]) + .observe(started.elapsed().as_secs_f64()); + } + } else if let Some((key, started)) = self.pending_get_providers.remove(&query_id) { + log::debug!( + target: LOG_TARGET, + "`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}" + ); + + self.event_streams.send(Event::Dht( + DhtEvent::ProvidersNotFound(key) + )); + + if let Some(ref metrics) = self.metrics { + metrics + .kademlia_query_duration + .with_label_values(&["providers-get-failed"]) + .observe(started.elapsed().as_secs_f64()); + } + } else { + log::warn!( + target: LOG_TARGET, + "non-existent query failed ({query_id:?})", + ); + } + } Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => { self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await; } diff --git a/substrate/client/network/src/litep2p/service.rs b/substrate/client/network/src/litep2p/service.rs index f75bc5266395..d270e90efdf5 100644 --- a/substrate/client/network/src/litep2p/service.rs +++ b/substrate/client/network/src/litep2p/service.rs @@ -110,6 +110,9 @@ pub enum NetworkServiceCommand { /// Stop providing `key`. StopProviding { key: KademliaKey }, + /// Get providers for `key`. + GetProviders { key: KademliaKey }, + /// Query network status. Status { /// `oneshot::Sender` for sending the status. @@ -310,6 +313,10 @@ impl NetworkDHTProvider for Litep2pNetworkService { fn stop_providing(&self, key: KademliaKey) { let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StopProviding { key }); } + + fn get_providers(&self, key: KademliaKey) { + let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::GetProviders { key }); + } } #[async_trait::async_trait] diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index e97300f1923a..898cb5daf786 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -1693,6 +1693,8 @@ where DhtEvent::ValuePutFailed(_) => "value-put-failed", DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request", DhtEvent::StartProvidingFailed(_) => "start-providing-failed", + DhtEvent::ProvidersFound(_, _) => "providers-found", + DhtEvent::ProvidersNotFound(_) => "providers-not-found", }; metrics .kademlia_query_duration diff --git a/substrate/client/network/src/service/traits.rs b/substrate/client/network/src/service/traits.rs index e95830853bfc..acfed9ea894c 100644 --- a/substrate/client/network/src/service/traits.rs +++ b/substrate/client/network/src/service/traits.rs @@ -240,6 +240,9 @@ pub trait NetworkDHTProvider { /// Deregister this node as a provider for `key` on the DHT. fn stop_providing(&self, key: KademliaKey); + + /// Start getting the list of providers for `key` on the DHT. + fn get_providers(&self, key: KademliaKey); } impl NetworkDHTProvider for Arc @@ -276,6 +279,10 @@ where fn stop_providing(&self, key: KademliaKey) { T::stop_providing(self, key) } + + fn get_providers(&self, key: KademliaKey) { + T::get_providers(self, key) + } } /// Provides an ability to set a fork sync request for a particular block.