Skip to content

Commit

Permalink
Expose API for getting content providers from DHT with libp2p backend
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-markin committed Nov 29, 2024
1 parent 5e62414 commit 477ff88
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 2 deletions.
3 changes: 3 additions & 0 deletions substrate/client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,9 @@ where
metrics.dht_event_received.with_label_values(&["put_record_req"]).inc();
}
},
DhtEvent::StartProvidingFailed(..) => {},
DhtEvent::ProvidersFound(..) => {},
DhtEvent::ProvidersNotFound(..) => {},
}
}

Expand Down
12 changes: 12 additions & 0 deletions substrate/client/authority-discovery/src/worker/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,18 @@ impl NetworkDHTProvider for TestNetwork {
.unbounded_send(TestNetworkEvent::StoreRecordCalled)
.unwrap();
}

fn start_providing(&self, _: KademliaKey) {
unimplemented!()
}

fn stop_providing(&self, _: KademliaKey) {
unimplemented!()
}

fn get_providers(&self, _: KademliaKey) {
unimplemented!()
}
}

impl NetworkStateInfo for TestNetwork {
Expand Down
15 changes: 15 additions & 0 deletions substrate/client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@ impl<B: BlockT> Behaviour<B> {
pub fn stop_providing(&mut self, key: &RecordKey) {
self.discovery.stop_providing(key)
}

/// Start searching for providers on the DHT. Will later produce either a `ProvidersFound`
/// or `ProvidersNotFound` event.
pub fn get_providers(&mut self, key: RecordKey) {
self.discovery.get_providers(key)
}
}

impl From<CustomMessageOutcome> for BehaviourOut {
Expand Down Expand Up @@ -399,6 +405,15 @@ impl From<DiscoveryOut> for BehaviourOut {
BehaviourOut::Dht(DhtEvent::ValuePutFailed(key.into()), Some(duration)),
DiscoveryOut::StartProvidingFailed(key) =>
BehaviourOut::Dht(DhtEvent::StartProvidingFailed(key.into()), None),
DiscoveryOut::ProvidersFound(key, providers, duration) => BehaviourOut::Dht(
DhtEvent::ProvidersFound(
key.into(),
providers.into_iter().map(Into::into).collect(),
),
Some(duration),
),
DiscoveryOut::ProvidersNotFound(key, duration) =>
BehaviourOut::Dht(DhtEvent::ProvidersNotFound(key.into()), Some(duration)),
DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted,
}
}
Expand Down
67 changes: 65 additions & 2 deletions substrate/client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ use libp2p::{
self,
record::store::{MemoryStore, RecordStore},
Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
GetClosestPeersError, GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record,
RecordKey,
GetClosestPeersError, GetProvidersError, GetProvidersOk, GetRecordOk, PeerRecord, QueryId,
QueryResult, Quorum, Record, RecordKey,
},
mdns::{self, tokio::Behaviour as TokioMdns},
multiaddr::Protocol,
Expand Down Expand Up @@ -484,6 +484,13 @@ impl DiscoveryBehaviour {
}
}

/// Get content providers for `key` from the DHT.
pub fn get_providers(&mut self, key: RecordKey) {
if let Some(kad) = self.kademlia.as_mut() {
kad.get_providers(key);
}
}

/// Store a record in the Kademlia record store.
pub fn store_record(
&mut self,
Expand Down Expand Up @@ -602,6 +609,12 @@ pub enum DiscoveryOut {
/// Starting providing a key failed.
StartProvidingFailed(RecordKey),

/// The DHT yielded results for the providers request.
ProvidersFound(RecordKey, HashSet<PeerId>, Duration),

/// Providers for the requested key were not found in the DHT.
ProvidersNotFound(RecordKey, Duration),

/// Started a random Kademlia query.
///
/// Only happens if [`DiscoveryConfig::with_dht_random_walk`] has been configured to `true`.
Expand Down Expand Up @@ -1003,6 +1016,56 @@ impl NetworkBehaviour for DiscoveryBehaviour {
};
return Poll::Ready(ToSwarm::GenerateEvent(ev))
},
KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetProviders(res),
stats,
id,
..
} => {
let ev = match res {
Ok(GetProvidersOk::FoundProviders { key, providers }) => {
debug!(
target: "sub-libp2p",
"Libp2p => Found providers {:?} for key {:?}, id {:?}, stats {:?}",
providers,
key,
id,
stats,
);

DiscoveryOut::ProvidersFound(
key,
providers,
stats.duration().unwrap_or_default(),
)
},
Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
closest_peers: _,
}) => {
debug!(
target: "sub-libp2p",
"Libp2p => Finished with no additional providers {:?}, stats {:?}, took {:?} ms",
id,
stats,
stats.duration().map(|val| val.as_millis())
);

continue
},
Err(GetProvidersError::Timeout { key, closest_peers: _ }) => {
debug!(
target: "sub-libp2p",
"Libp2p => Failed to get providers for {key:?} due to timeout.",
);

DiscoveryOut::ProvidersNotFound(
key,
stats.duration().unwrap_or_default(),
)
},
};
return Poll::Ready(ToSwarm::GenerateEvent(ev))
},
KademliaEvent::OutboundQueryProgressed {
result: QueryResult::PutRecord(res),
stats,
Expand Down
7 changes: 7 additions & 0 deletions substrate/client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,10 @@ where
fn stop_providing(&self, key: KademliaKey) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StopProviding(key));
}

fn get_providers(&self, key: KademliaKey) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetProviders(key));
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -1343,6 +1347,7 @@ enum ServiceToWorkerMsg {
StoreRecord(KademliaKey, Vec<u8>, Option<PeerId>, Option<Instant>),
StartProviding(KademliaKey),
StopProviding(KademliaKey),
GetProviders(KademliaKey),
AddKnownAddress(PeerId, Multiaddr),
EventStream(out_events::Sender),
Request {
Expand Down Expand Up @@ -1480,6 +1485,8 @@ where
self.network_service.behaviour_mut().start_providing(key.into()),
ServiceToWorkerMsg::StopProviding(key) =>
self.network_service.behaviour_mut().stop_providing(&key.into()),
ServiceToWorkerMsg::GetProviders(key) =>
self.network_service.behaviour_mut().get_providers(key.into()),
ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) =>
self.network_service.behaviour_mut().add_known_address(peer_id, addr),
ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
Expand Down

0 comments on commit 477ff88

Please sign in to comment.