Skip to content

Commit

Permalink
Expose start/stop content providing on DHT for libp2p backend
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-markin committed Nov 29, 2024
1 parent 51d4051 commit 61a0f4e
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 0 deletions.
12 changes: 12 additions & 0 deletions substrate/client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,16 @@ impl<B: BlockT> Behaviour<B> {
) {
self.discovery.store_record(record_key, record_value, publisher, expires);
}

/// Start providing `key` on the DHT.
pub fn start_providing(&mut self, key: RecordKey) {
self.discovery.start_providing(key)
}

/// Stop providing `key` on the DHT.
pub fn stop_providing(&mut self, key: &RecordKey) {
self.discovery.stop_providing(key)
}
}

impl From<CustomMessageOutcome> for BehaviourOut {
Expand Down Expand Up @@ -387,6 +397,8 @@ impl From<DiscoveryOut> for BehaviourOut {
),
DiscoveryOut::ValuePutFailed(key, duration) =>
BehaviourOut::Dht(DhtEvent::ValuePutFailed(key.into()), Some(duration)),
DiscoveryOut::StartProvidingFailed(key) =>
BehaviourOut::Dht(DhtEvent::StartProvidingFailed(key.into()), None),
DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted,
}
}
Expand Down
21 changes: 21 additions & 0 deletions substrate/client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,24 @@ impl DiscoveryBehaviour {
}
}
}

/// Register as a content provider on the DHT for `key`.
pub fn start_providing(&mut self, key: RecordKey) {
if let Some(kad) = self.kademlia.as_mut() {
if let Err(e) = kad.start_providing(key.clone()) {
warn!(target: "sub-libp2p", "Libp2p => Failed to start providing {key:?}: {e}.");
self.pending_events.push_back(DiscoveryOut::StartProvidingFailed(key));
}
}
}

/// Deregister as a content provider on the DHT for `key`.
pub fn stop_providing(&mut self, key: &RecordKey) {
if let Some(kad) = self.kademlia.as_mut() {
kad.stop_providing(key);
}
}

/// Store a record in the Kademlia record store.
pub fn store_record(
&mut self,
Expand Down Expand Up @@ -581,6 +599,9 @@ pub enum DiscoveryOut {
/// Returning the corresponding key as well as the request duration.
ValuePutFailed(RecordKey, Duration),

/// Starting providing a key failed.
StartProvidingFailed(RecordKey),

/// Started a random Kademlia query.
///
/// Only happens if [`DiscoveryConfig::with_dht_random_walk`] has been configured to `true`.
Expand Down
3 changes: 3 additions & 0 deletions substrate/client/network/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub enum DhtEvent {
/// An error has occurred while putting a record into the DHT.
ValuePutFailed(Key),

/// An error occured while registering as a content provider on the DHT.
StartProvidingFailed(Key),

/// The DHT received a put record request.
PutRecordRequest(Key, Vec<u8>, Option<sc_network_types::PeerId>, Option<std::time::Instant>),
}
Expand Down
15 changes: 15 additions & 0 deletions substrate/client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,14 @@ where
expires,
));
}

fn start_providing(&self, key: KademliaKey) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StartProviding(key));
}

fn stop_providing(&self, key: KademliaKey) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StopProviding(key));
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -1333,6 +1341,8 @@ enum ServiceToWorkerMsg {
update_local_storage: bool,
},
StoreRecord(KademliaKey, Vec<u8>, Option<PeerId>, Option<Instant>),
StartProviding(KademliaKey),
StopProviding(KademliaKey),
AddKnownAddress(PeerId, Multiaddr),
EventStream(out_events::Sender),
Request {
Expand Down Expand Up @@ -1466,6 +1476,10 @@ where
.network_service
.behaviour_mut()
.store_record(key.into(), value, publisher, expires),
ServiceToWorkerMsg::StartProviding(key) =>
self.network_service.behaviour_mut().start_providing(key.into()),
ServiceToWorkerMsg::StopProviding(key) =>
self.network_service.behaviour_mut().stop_providing(&key.into()),
ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) =>
self.network_service.behaviour_mut().add_known_address(peer_id, addr),
ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
Expand Down Expand Up @@ -1678,6 +1692,7 @@ where
DhtEvent::ValuePut(_) => "value-put",
DhtEvent::ValuePutFailed(_) => "value-put-failed",
DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request",
DhtEvent::StartProvidingFailed(_) => "start-providing-failed",
};
metrics
.kademlia_query_duration
Expand Down

0 comments on commit 61a0f4e

Please sign in to comment.