Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ipld): migrate metrics scheme #1207

Merged
merged 11 commits into from
Nov 28, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ipld/resolver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ fvm_shared = { workspace = true }
fvm_ipld_blockstore = { workspace = true, optional = true }

ipc-api = { path = "../../ipc/api", default-features = false }
ipc-observability = { workspace = true }

[dev-dependencies]
cid = { workspace = true }
Expand Down
14 changes: 7 additions & 7 deletions ipld/resolver/src/behaviour/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ use std::{
time::Duration,
};

use crate::{
limiter::{RateLimit, RateLimiter},
observe,
};
use ipc_observability::emit;
use libipld::{store::StoreParams, Cid};
use libp2p::{
core::{ConnectedPoint, Endpoint},
Expand All @@ -22,11 +27,6 @@ use libp2p_bitswap::{Bitswap, BitswapConfig, BitswapEvent, BitswapResponse, Bits
use log::debug;
use prometheus::Registry;

use crate::{
limiter::{RateLimit, RateLimiter},
stats,
};

pub type QueryId = libp2p_bitswap::QueryId;

// Not much to do here, just hiding the `Progress` event as I don't think we'll need it.
Expand Down Expand Up @@ -140,7 +140,7 @@ impl<P: StoreParams> Behaviour<P> {
/// will initiate connections to the peers which aren't connected at the moment.
pub fn resolve(&mut self, cid: Cid, peers: Vec<PeerId>) -> QueryId {
debug!("resolving {cid} from {peers:?}");
stats::CONTENT_RESOLVE_RUNNING.inc();
emit(observe::ResolveEvent::Started(cid));
// Not passing any missing items, which will result in a call to `BitswapStore::missing_blocks`.
self.inner.sync(cid, peers, [].into_iter())
}
Expand Down Expand Up @@ -334,7 +334,7 @@ impl<P: StoreParams> NetworkBehaviour for Behaviour<P> {
ToSwarm::GenerateEvent(ev) => match ev {
BitswapEvent::Progress(_, _) => {}
BitswapEvent::Complete(id, result) => {
stats::CONTENT_RESOLVE_RUNNING.dec();
emit(observe::ResolveEvent::Completed);
let out = Event::Complete(id, result);
return Poll::Ready(ToSwarm::GenerateEvent(out));
}
Expand Down
14 changes: 6 additions & 8 deletions ipld/resolver/src/behaviour/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use std::{
time::Duration,
};

use super::NetworkConfig;
use crate::observe;
use ipc_observability::emit;
use libp2p::{
core::Endpoint,
identify::Info,
Expand All @@ -23,11 +26,6 @@ use libp2p::{
};
use log::{debug, warn};
use tokio::time::Interval;

use crate::stats;

use super::NetworkConfig;

// NOTE: The Discovery behaviour is largely based on what exists in Forest. If it ain't broken...
// NOTE: Not sure if emitting events is going to be useful yet, but for now it's an example of having one.

Expand Down Expand Up @@ -178,7 +176,7 @@ impl Behaviour {
pub fn background_lookup(&mut self, peer_id: PeerId) {
if self.addresses_of_peer(peer_id).is_empty() {
if let Some(kademlia) = self.inner.as_mut() {
stats::DISCOVERY_BACKGROUND_LOOKUP.inc();
emit(observe::DiscoveryEvent::BackgroundLookup(peer_id));
kademlia.get_closest_peers(peer_id);
}
}
Expand Down Expand Up @@ -241,13 +239,13 @@ impl NetworkBehaviour for Behaviour {
match &event {
FromSwarm::ConnectionEstablished(e) => {
if e.other_established == 0 {
stats::DISCOVERY_CONNECTED_PEERS.inc();
emit(observe::DiscoveryEvent::ConnectionEstablished(e.peer_id));
self.num_connections += 1;
}
}
FromSwarm::ConnectionClosed(e) => {
if e.remaining_established == 0 {
stats::DISCOVERY_CONNECTED_PEERS.dec();
emit(observe::DiscoveryEvent::ConnectionClosed(e.peer_id));
self.num_connections -= 1;
}
}
Expand Down
77 changes: 39 additions & 38 deletions ipld/resolver/src/behaviour/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@ use std::marker::PhantomData;
use std::task::{Context, Poll};
use std::time::Duration;

use super::NetworkConfig;
use crate::hash::blake2b_256;
use crate::observe;
use crate::provider_cache::{ProviderDelta, SubnetProviderCache};
use crate::provider_record::{ProviderRecord, SignedProviderRecord};
use crate::vote_record::{SignedVoteRecord, VoteRecord};
use crate::Timestamp;
use anyhow::anyhow;
use ipc_api::subnet_id::SubnetID;
use ipc_observability::emit;
use libp2p::core::Endpoint;
use libp2p::gossipsub::{
self, IdentTopic, MessageAuthenticity, MessageId, PublishError, Sha256Topic, SubscriptionError,
Expand All @@ -24,14 +32,6 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::time::{Instant, Interval};

use crate::hash::blake2b_256;
use crate::provider_cache::{ProviderDelta, SubnetProviderCache};
use crate::provider_record::{ProviderRecord, SignedProviderRecord};
use crate::vote_record::{SignedVoteRecord, VoteRecord};
use crate::{stats, Timestamp};

use super::NetworkConfig;

/// `Gossipsub` topic identifier for subnet membership.
const PUBSUB_MEMBERSHIP: &str = "/ipc/membership";
/// `Gossipsub` topic identifier for voting about content.
Expand Down Expand Up @@ -326,11 +326,13 @@ where
);
match self.inner.publish(self.membership_topic.clone(), data) {
Err(e) => {
stats::MEMBERSHIP_PUBLISH_FAILURE.inc();
emit(observe::MembershipFailureEvent::PublishFailure(
e.to_string().into(),
));
Err(anyhow!(e))
}
Ok(_msg_id) => {
stats::MEMBERSHIP_PUBLISH_SUCCESS.inc();
emit(observe::MembershipEvent::PublishSuccess);
self.last_publish_timestamp = Timestamp::now();
self.next_publish_timestamp =
self.last_publish_timestamp + self.publish_interval.period();
Expand All @@ -346,11 +348,13 @@ where
let data = vote.into_envelope().into_protobuf_encoding();
match self.inner.publish(topic, data) {
Err(e) => {
stats::MEMBERSHIP_PUBLISH_FAILURE.inc();
emit(observe::MembershipFailureEvent::PublishFailure(
e.to_string().into(),
));
Err(anyhow!(e))
}
Ok(_msg_id) => {
stats::MEMBERSHIP_PUBLISH_SUCCESS.inc();
emit(observe::MembershipEvent::PublishSuccess);
Ok(())
}
}
Expand All @@ -363,11 +367,13 @@ where
let topic = self.preemptive_topic(&subnet_id);
match self.inner.publish(topic, data) {
Err(e) => {
stats::MEMBERSHIP_PUBLISH_FAILURE.inc();
emit(observe::MembershipFailureEvent::PublishFailure(
e.to_string().into(),
));
Err(anyhow!(e))
}
Ok(_msg_id) => {
stats::MEMBERSHIP_PUBLISH_SUCCESS.inc();
emit(observe::MembershipEvent::PublishSuccess);
Ok(())
}
}
Expand All @@ -378,8 +384,9 @@ where
/// Call this method when the discovery service learns the address of a peer.
pub fn set_routable(&mut self, peer_id: PeerId) {
self.provider_cache.set_routable(peer_id);
stats::MEMBERSHIP_ROUTABLE_PEERS
.set(self.provider_cache.num_routable().try_into().unwrap());

let num_routable = self.provider_cache.num_routable().try_into().unwrap();
emit(observe::MembershipEvent::RoutablePeers(num_routable));
self.publish_for_new_peer(peer_id);
}

Expand All @@ -406,33 +413,27 @@ where
if msg.topic == self.membership_topic.hash() {
match SignedProviderRecord::from_bytes(&msg.data).map(|r| r.into_record()) {
Ok(record) => self.handle_provider_record(record),
Err(e) => {
stats::MEMBERSHIP_INVALID_MESSAGE.inc();
warn!(
"Gossip message from peer {:?} could not be deserialized as ProviderRecord: {e}",
msg.source
);
}
Err(e) => emit(
observe::MembershipFailureEvent::GossipInvalidProviderRecord(
msg.source,
e.into(),
),
),
}
} else if self.voting_topics.contains(&msg.topic) {
match SignedVoteRecord::from_bytes(&msg.data).map(|r| r.into_record()) {
Ok(record) => self.handle_vote_record(record),
Err(e) => {
stats::MEMBERSHIP_INVALID_MESSAGE.inc();
warn!(
"Gossip message from peer {:?} could not be deserialized as VoteRecord: {e}",
msg.source
);
}
Err(e) => emit(observe::MembershipFailureEvent::GossipInvalidVoteRecord(
msg.source,
e.into(),
)),
}
} else if let Some(subnet_id) = self.preemptive_topics.get(&msg.topic) {
self.handle_preemptive_data(subnet_id.clone(), msg.data)
} else {
stats::MEMBERSHIP_UNKNOWN_TOPIC.inc();
warn!(
"unknown gossipsub topic in message from {:?}: {}",
msg.source, msg.topic
);
emit(observe::MembershipFailureEvent::GossipUnknownTopic(
msg.source, msg.topic,
));
}
}

Expand All @@ -444,7 +445,7 @@ where
debug!("received provider record: {record:?}");
let (event, publish) = match self.provider_cache.add_provider(&record) {
None => {
stats::MEMBERSHIP_SKIPPED_PEERS.inc();
emit(observe::MembershipEvent::Skipped(record.peer_id));
(Some(Event::Skipped(record.peer_id)), false)
}
Some(d) if d.is_empty() && !d.is_new => (None, false),
Expand All @@ -459,7 +460,7 @@ where
}

if publish {
stats::MEMBERSHIP_PROVIDER_PEERS.inc();
emit(observe::MembershipEvent::Added(record.peer_id));
self.publish_for_new_peer(record.peer_id)
}
}
Expand Down Expand Up @@ -514,7 +515,7 @@ where
let cutoff_timestamp = Timestamp::now() - self.max_provider_age;
let pruned = self.provider_cache.prune_providers(cutoff_timestamp);
for peer_id in pruned {
stats::MEMBERSHIP_PROVIDER_PEERS.dec();
emit(observe::MembershipEvent::Removed(peer_id));
self.outbox.push_back(Event::Removed(peer_id))
}
}
Expand Down
2 changes: 1 addition & 1 deletion ipld/resolver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ mod behaviour;
mod client;
mod hash;
mod limiter;
mod observe;
mod service;
mod stats;
mod timestamp;

mod provider_cache;
Expand Down
Loading
Loading