Skip to content

Commit

Permalink
Expose API for getting content providers from DHT with litep2p backend
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-markin committed Nov 29, 2024
1 parent 61a0f4e commit 5e62414
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 32 deletions.
6 changes: 6 additions & 0 deletions substrate/client/network/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ pub enum DhtEvent {

/// The DHT received a put record request.
PutRecordRequest(Key, Vec<u8>, Option<sc_network_types::PeerId>, Option<std::time::Instant>),

/// The providers for [`Key`] were found.
ProvidersFound(Key, Vec<PeerId>),

/// The providers for [`Key`] were not found.
ProvidersNotFound(Key),
}

/// Type for events generated by networking layer.
Expand Down
33 changes: 30 additions & 3 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use litep2p::{
libp2p::{
identify::{Config as IdentifyConfig, IdentifyEvent},
kademlia::{
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder,
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, ContentProvider,
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum,
Record, RecordKey, RecordsType,
},
Expand Down Expand Up @@ -144,6 +144,14 @@ pub enum DiscoveryEvent {
query_id: QueryId,
},

/// Providers were successfully retrieved.
GetProvidersSuccess {
/// Query ID.
query_id: QueryId,
/// Found providers sorted by distance to provided key.
providers: Vec<ContentProvider>,
},

/// Query failed.
QueryFailed {
/// Query ID.
Expand Down Expand Up @@ -417,6 +425,11 @@ impl Discovery {
self.kademlia_handle.stop_providing(key.into()).await;
}

/// Get providers for `key`.
pub async fn get_providers(&mut self, key: KademliaKey) -> QueryId {
self.kademlia_handle.get_providers(key.into()).await
}

/// Check if the observed address is a known address.
fn is_known_address(known: &Multiaddr, observed: &Multiaddr) -> bool {
let mut known = known.iter();
Expand Down Expand Up @@ -591,8 +604,22 @@ impl Stream for Discovery {

return Poll::Ready(Some(DiscoveryEvent::IncomingRecord { record }))
},
// Content provider events are ignored for now.
Poll::Ready(Some(KademliaEvent::GetProvidersSuccess { .. })) |
Poll::Ready(Some(KademliaEvent::GetProvidersSuccess {
provided_key,
providers,
query_id,
})) => {
log::trace!(
target: LOG_TARGET,
"`GET_PROVIDERS` for {query_id:?} with {provided_key:?} yielded {providers:?}",
);

return Poll::Ready(Some(DiscoveryEvent::GetProvidersSuccess {
query_id,
providers,
}))
},
// We do not validate incoming providers.
Poll::Ready(Some(KademliaEvent::IncomingProvider { .. })) => {},
}

Expand Down
106 changes: 77 additions & 29 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ pub struct Litep2pNetworkBackend {
/// Pending `PUT_VALUE` queries.
pending_put_values: HashMap<QueryId, (RecordKey, Instant)>,

/// Pending `GET_PROVIDERS` queries.
pending_get_providers: HashMap<QueryId, (RecordKey, Instant)>,

/// Discovery.
discovery: Discovery,

Expand Down Expand Up @@ -617,6 +620,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
discovery,
pending_put_values: HashMap::new(),
pending_get_values: HashMap::new(),
pending_get_providers: HashMap::new(),
peerstore_handle: peer_store_handle,
block_announce_protocol,
event_streams: out_events::OutChannels::new(None)?,
Expand Down Expand Up @@ -724,6 +728,10 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
NetworkServiceCommand::StopProviding { key } => {
self.discovery.stop_providing(key).await;
}
NetworkServiceCommand::GetProviders { key } => {
let query_id = self.discovery.get_providers(key.clone()).await;
self.pending_get_providers.insert(query_id, (key, Instant::now()));
}
NetworkServiceCommand::EventStream { tx } => {
self.event_streams.push(tx);
}
Expand Down Expand Up @@ -881,50 +889,90 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
}
}
}
Some(DiscoveryEvent::QueryFailed { query_id }) => {
match self.pending_get_values.remove(&query_id) {
None => match self.pending_put_values.remove(&query_id) {
None => log::warn!(
target: LOG_TARGET,
"non-existent query failed ({query_id:?})",
),
Some((key, started)) => {
log::debug!(
target: LOG_TARGET,
"`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
);

self.event_streams.send(Event::Dht(
DhtEvent::ValuePutFailed(key)
));

if let Some(ref metrics) = self.metrics {
metrics
.kademlia_query_duration
.with_label_values(&["value-put-failed"])
.observe(started.elapsed().as_secs_f64());
}
}
}
Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
match self.pending_get_providers.remove(&query_id) {
None => log::warn!(
target: LOG_TARGET,
"`GET_PROVIDERS` succeeded for a non-existent query",
),
Some((key, started)) => {
log::debug!(
log::trace!(
target: LOG_TARGET,
"`GET_VALUE` ({query_id:?}) failed for key {key:?}",
"`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
);

self.event_streams.send(Event::Dht(
DhtEvent::ValueNotFound(key)
DhtEvent::ProvidersFound(
key.into(),
providers.into_iter().map(|p| p.peer.into()).collect()
)
));

if let Some(ref metrics) = self.metrics {
metrics
.kademlia_query_duration
.with_label_values(&["value-get-failed"])
.with_label_values(&["providers-get"])
.observe(started.elapsed().as_secs_f64());
}
}
}
}
Some(DiscoveryEvent::QueryFailed { query_id }) => {
if let Some((key, started)) = self.pending_get_values.remove(&query_id) {
log::debug!(
target: LOG_TARGET,
"`GET_VALUE` ({query_id:?}) failed for key {key:?}",
);

self.event_streams.send(Event::Dht(
DhtEvent::ValueNotFound(key)
));

if let Some(ref metrics) = self.metrics {
metrics
.kademlia_query_duration
.with_label_values(&["value-get-failed"])
.observe(started.elapsed().as_secs_f64());
}
} else if let Some((key, started)) = self.pending_put_values.remove(&query_id) {
log::debug!(
target: LOG_TARGET,
"`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
);

self.event_streams.send(Event::Dht(
DhtEvent::ValuePutFailed(key)
));

if let Some(ref metrics) = self.metrics {
metrics
.kademlia_query_duration
.with_label_values(&["value-put-failed"])
.observe(started.elapsed().as_secs_f64());
}
} else if let Some((key, started)) = self.pending_get_providers.remove(&query_id) {
log::debug!(
target: LOG_TARGET,
"`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
);

self.event_streams.send(Event::Dht(
DhtEvent::ProvidersNotFound(key)
));

if let Some(ref metrics) = self.metrics {
metrics
.kademlia_query_duration
.with_label_values(&["providers-get-failed"])
.observe(started.elapsed().as_secs_f64());
}
} else {
log::warn!(
target: LOG_TARGET,
"non-existent query failed ({query_id:?})",
);
}
}
Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
}
Expand Down
7 changes: 7 additions & 0 deletions substrate/client/network/src/litep2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ pub enum NetworkServiceCommand {
/// Stop providing `key`.
StopProviding { key: KademliaKey },

/// Get providers for `key`.
GetProviders { key: KademliaKey },

/// Query network status.
Status {
/// `oneshot::Sender` for sending the status.
Expand Down Expand Up @@ -310,6 +313,10 @@ impl NetworkDHTProvider for Litep2pNetworkService {
fn stop_providing(&self, key: KademliaKey) {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StopProviding { key });
}

fn get_providers(&self, key: KademliaKey) {
let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::GetProviders { key });
}
}

#[async_trait::async_trait]
Expand Down
2 changes: 2 additions & 0 deletions substrate/client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1693,6 +1693,8 @@ where
DhtEvent::ValuePutFailed(_) => "value-put-failed",
DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request",
DhtEvent::StartProvidingFailed(_) => "start-providing-failed",
DhtEvent::ProvidersFound(_, _) => "providers-found",
DhtEvent::ProvidersNotFound(_) => "providers-not-found",
};
metrics
.kademlia_query_duration
Expand Down
7 changes: 7 additions & 0 deletions substrate/client/network/src/service/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ pub trait NetworkDHTProvider {

/// Deregister this node as a provider for `key` on the DHT.
fn stop_providing(&self, key: KademliaKey);

/// Start getting the list of providers for `key` on the DHT.
fn get_providers(&self, key: KademliaKey);
}

impl<T> NetworkDHTProvider for Arc<T>
Expand Down Expand Up @@ -276,6 +279,10 @@ where
fn stop_providing(&self, key: KademliaKey) {
T::stop_providing(self, key)
}

fn get_providers(&self, key: KademliaKey) {
T::get_providers(self, key)
}
}

/// Provides an ability to set a fork sync request for a particular block.
Expand Down

0 comments on commit 5e62414

Please sign in to comment.