Skip to content

Commit

Permalink
refactor(ipld): migrate metrics scheme (#1207)
Browse files Browse the repository at this point in the history
  • Loading branch information
LePremierHomme authored Nov 28, 2024
1 parent ed602ce commit 7572a6a
Show file tree
Hide file tree
Showing 10 changed files with 633 additions and 209 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 93 additions & 0 deletions docs/fendermint/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ This is achieved through the use of the `ipc-observability` crate/library, which
- `ipc_topdown_parent_finality_voting_quorum_height` (IntGauge): Sets the height of the parent finality quorum.
- `ipc_topdown_parent_finality_voting_quorum_weight` (IntGauge): Sets the weight of the parent finality quorum.
- `ipc_topdown_parent_finality_committed_height` (IntGauge): Sets the height of the committed parent finality.
- `ipld_resolver_ping_rtt` (Histogram): Records a ping roundtrip time.
- `ipld_resolver_ping_timeouts` (IntCounter): Incremented when a ping timed out.
- `ipld_resolver_ping_failure` (IntCounter): Incremented when a ping failed.
- `ipld_resolver_ping_success` (IntCounter): Incremented when a ping succeeded.
- `ipld_resolver_identify_failure` (IntCounter): Incremented when an identify failed.
- `ipld_resolver_identify_received` (IntCounter): Incremented when an identify info received.
- `ipld_resolver_discovery_background_lookup` (IntCounter): Incremented when a discovery background lookup started.
- `ipld_resolver_discovery_connected_peers` (IntGauge): Sets the number of discovery connected peers.
- `ipld_resolver_membership_skipped_peers` (IntCounter): Incremented when a membership provider skipped.
- `ipld_resolver_membership_routable_peers` (IntGauge): Sets the number of routable peers.
- `ipld_resolver_membership_provider_peers` (IntGauge): Sets the number of unique peers.
- `ipld_resolver_membership_unknown_topic` (IntCounter): Incremented when a membership of unknown topic received.
- `ipld_resolver_membership_invalid_message` (IntCounter): Incremented when a membership with invalid message received.
- `ipld_resolver_membership_publish_total` (IntCounter): Incremented when a membership published.
- `ipld_resolver_membership_publish_failure` (IntCounter): Incremented when a membership publish failed.
- `ipld_resolver_content_resolve_running` (IntGauge): Sets the number currently running content resolutions.
- `ipld_resolver_content_resolve_no_peers` (IntCounter): Incremented when a resolution had no peer.
- `ipld_resolver_content_resolve_success` (IntCounter): Incremented when a resolution succeeded.
- `ipld_resolver_content_resolve_failure` (IntCounter): Incremented when a resolution failed.
- `ipld_resolver_content_resolve_fallback` (IntCounter): Incremented when a resolution had a fallback.
- `ipld_resolver_content_resolve_peers` (Histogram): Records the number of peers found for a resolution from a subnet.
- `ipld_resolver_content_connected_peers` (Histogram): Records the number connected peers in a resolution.
- `ipld_resolver_content_rate_limited` (IntCounter): Incremented when a resolution was rate limited.
- `ipc_tracing_errors` (IntCounterVec): Increments the count of tracing errors for the affected event.

## Events and corresponding metrics
Expand Down Expand Up @@ -290,6 +313,76 @@ Represents the commitment of parent finality.

- `ipc_topdown_parent_finality_committed_height`

### PingEvent

**Variants and affected metrics:**

- `Success(PeerId, Duration)`: `ipld_resolver_ping_rtt`,`ipld_resolver_ping_success`

### PingFailureEvent

**Variants and affected metrics:**

- `Timeout(PeerId)`: `ipld_resolver_ping_timeouts`
- `Failure(PeerId, Duration)`: `ipld_resolver_ping_failure`

### IdentifyEvent

**Variants and affected metrics:**

- `Received(PeerId)`: `ipld_resolver_identify_received`

### IdentifyFailureEvent

**Variants and affected metrics:**

- `Failure(PeerId, String)`: `ipld_resolver_identify_failure`

### DiscoveryEvent

**Variants and affected metrics:**

- `BackgroundLookup(PeerId)`: `ipld_resolver_discovery_background_lookup`
- `ConnectionEstablished(PeerId)`: `ipld_resolver_discovery_connected_peers`
- `ConnectionClosed(PeerId)`: `ipld_resolver_discovery_connected_peers`

### MembershipEvent

**Variants and affected metrics:**

- `Added(PeerId)`: `ipld_resolver_membership_provider_peers`
- `Removed(PeerId)`: `ipld_resolver_membership_provider_peers`
- `Skipped(PeerId)`: `ipld_resolver_membership_skipped_peers`
- `PublishSuccess`: `ipld_resolver_membership_publish_total`
- `RoutablePeers(i64)`: `ipld_resolver_membership_routable_peers`

### MembershipFailureEvent

**Variants and affected metrics:**

- `PublishFailure(String)`: `ipld_resolver_membership_publish_failure`
- `GossipInvalidProviderRecord(Option<PeerId>, String)`: `ipld_resolver_membership_invalid_message`
- `GossipInvalidVoteRecord(Option<PeerId>, String)`: `ipld_resolver_membership_invalid_message`
- `GossipUnknownTopic(Option<PeerId>, TopicHash)`: `ipld_resolver_membership_unknown_topic`

### ResolveEvent

**Variants and affected metrics:**

- `Started(Cid)`: `ipld_resolver_content_resolve_running`
- `Success(Cid)`: `ipld_resolver_content_resolve_success`
- `Completed`: `ipld_resolver_content_resolve_running`
- `Peers(usize)`: `ipld_resolver_content_resolve_peers`
- `NoPeers`: `ipld_resolver_content_resolve_no_peers`
- `ConnectedPeers(usize)`: `ipld_resolver_content_connected_peers`

### ResolveFailureEvent

**Variants and affected metrics:**

- `Failure(Cid)`: `ipld_resolver_content_resolve_failure`
- `Fallback(Cid)`: `ipld_resolver_content_resolve_fallback`

### TracingError

**Description:**
Expand Down
1 change: 1 addition & 0 deletions ipld/resolver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ fvm_shared = { workspace = true }
fvm_ipld_blockstore = { workspace = true, optional = true }

ipc-api = { path = "../../ipc/api", default-features = false }
ipc-observability = { workspace = true }

[dev-dependencies]
cid = { workspace = true }
Expand Down
14 changes: 7 additions & 7 deletions ipld/resolver/src/behaviour/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ use std::{
time::Duration,
};

use crate::{
limiter::{RateLimit, RateLimiter},
observe,
};
use ipc_observability::emit;
use libipld::{store::StoreParams, Cid};
use libp2p::{
core::{ConnectedPoint, Endpoint},
Expand All @@ -22,11 +27,6 @@ use libp2p_bitswap::{Bitswap, BitswapConfig, BitswapEvent, BitswapResponse, Bits
use log::debug;
use prometheus::Registry;

use crate::{
limiter::{RateLimit, RateLimiter},
stats,
};

pub type QueryId = libp2p_bitswap::QueryId;

// Not much to do here, just hiding the `Progress` event as I don't think we'll need it.
Expand Down Expand Up @@ -140,7 +140,7 @@ impl<P: StoreParams> Behaviour<P> {
/// will initiate connections to the peers which aren't connected at the moment.
pub fn resolve(&mut self, cid: Cid, peers: Vec<PeerId>) -> QueryId {
debug!("resolving {cid} from {peers:?}");
stats::CONTENT_RESOLVE_RUNNING.inc();
emit(observe::ResolveEvent::Started(cid));
// Not passing any missing items, which will result in a call to `BitswapStore::missing_blocks`.
self.inner.sync(cid, peers, [].into_iter())
}
Expand Down Expand Up @@ -334,7 +334,7 @@ impl<P: StoreParams> NetworkBehaviour for Behaviour<P> {
ToSwarm::GenerateEvent(ev) => match ev {
BitswapEvent::Progress(_, _) => {}
BitswapEvent::Complete(id, result) => {
stats::CONTENT_RESOLVE_RUNNING.dec();
emit(observe::ResolveEvent::Completed);
let out = Event::Complete(id, result);
return Poll::Ready(ToSwarm::GenerateEvent(out));
}
Expand Down
14 changes: 6 additions & 8 deletions ipld/resolver/src/behaviour/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use std::{
time::Duration,
};

use super::NetworkConfig;
use crate::observe;
use ipc_observability::emit;
use libp2p::{
core::Endpoint,
identify::Info,
Expand All @@ -23,11 +26,6 @@ use libp2p::{
};
use log::{debug, warn};
use tokio::time::Interval;

use crate::stats;

use super::NetworkConfig;

// NOTE: The Discovery behaviour is largely based on what exists in Forest. If it ain't broken...
// NOTE: Not sure if emitting events is going to be useful yet, but for now it's an example of having one.

Expand Down Expand Up @@ -178,7 +176,7 @@ impl Behaviour {
pub fn background_lookup(&mut self, peer_id: PeerId) {
if self.addresses_of_peer(peer_id).is_empty() {
if let Some(kademlia) = self.inner.as_mut() {
stats::DISCOVERY_BACKGROUND_LOOKUP.inc();
emit(observe::DiscoveryEvent::BackgroundLookup(peer_id));
kademlia.get_closest_peers(peer_id);
}
}
Expand Down Expand Up @@ -241,13 +239,13 @@ impl NetworkBehaviour for Behaviour {
match &event {
FromSwarm::ConnectionEstablished(e) => {
if e.other_established == 0 {
stats::DISCOVERY_CONNECTED_PEERS.inc();
emit(observe::DiscoveryEvent::ConnectionEstablished(e.peer_id));
self.num_connections += 1;
}
}
FromSwarm::ConnectionClosed(e) => {
if e.remaining_established == 0 {
stats::DISCOVERY_CONNECTED_PEERS.dec();
emit(observe::DiscoveryEvent::ConnectionClosed(e.peer_id));
self.num_connections -= 1;
}
}
Expand Down
77 changes: 39 additions & 38 deletions ipld/resolver/src/behaviour/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@ use std::marker::PhantomData;
use std::task::{Context, Poll};
use std::time::Duration;

use super::NetworkConfig;
use crate::hash::blake2b_256;
use crate::observe;
use crate::provider_cache::{ProviderDelta, SubnetProviderCache};
use crate::provider_record::{ProviderRecord, SignedProviderRecord};
use crate::vote_record::{SignedVoteRecord, VoteRecord};
use crate::Timestamp;
use anyhow::anyhow;
use ipc_api::subnet_id::SubnetID;
use ipc_observability::emit;
use libp2p::core::Endpoint;
use libp2p::gossipsub::{
self, IdentTopic, MessageAuthenticity, MessageId, PublishError, Sha256Topic, SubscriptionError,
Expand All @@ -24,14 +32,6 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::time::{Instant, Interval};

use crate::hash::blake2b_256;
use crate::provider_cache::{ProviderDelta, SubnetProviderCache};
use crate::provider_record::{ProviderRecord, SignedProviderRecord};
use crate::vote_record::{SignedVoteRecord, VoteRecord};
use crate::{stats, Timestamp};

use super::NetworkConfig;

/// `Gossipsub` topic identifier for subnet membership.
const PUBSUB_MEMBERSHIP: &str = "/ipc/membership";
/// `Gossipsub` topic identifier for voting about content.
Expand Down Expand Up @@ -326,11 +326,13 @@ where
);
match self.inner.publish(self.membership_topic.clone(), data) {
Err(e) => {
stats::MEMBERSHIP_PUBLISH_FAILURE.inc();
emit(observe::MembershipFailureEvent::PublishFailure(
e.to_string(),
));
Err(anyhow!(e))
}
Ok(_msg_id) => {
stats::MEMBERSHIP_PUBLISH_SUCCESS.inc();
emit(observe::MembershipEvent::PublishSuccess);
self.last_publish_timestamp = Timestamp::now();
self.next_publish_timestamp =
self.last_publish_timestamp + self.publish_interval.period();
Expand All @@ -346,11 +348,13 @@ where
let data = vote.into_envelope().into_protobuf_encoding();
match self.inner.publish(topic, data) {
Err(e) => {
stats::MEMBERSHIP_PUBLISH_FAILURE.inc();
emit(observe::MembershipFailureEvent::PublishFailure(
e.to_string(),
));
Err(anyhow!(e))
}
Ok(_msg_id) => {
stats::MEMBERSHIP_PUBLISH_SUCCESS.inc();
emit(observe::MembershipEvent::PublishSuccess);
Ok(())
}
}
Expand All @@ -363,11 +367,13 @@ where
let topic = self.preemptive_topic(&subnet_id);
match self.inner.publish(topic, data) {
Err(e) => {
stats::MEMBERSHIP_PUBLISH_FAILURE.inc();
emit(observe::MembershipFailureEvent::PublishFailure(
e.to_string(),
));
Err(anyhow!(e))
}
Ok(_msg_id) => {
stats::MEMBERSHIP_PUBLISH_SUCCESS.inc();
emit(observe::MembershipEvent::PublishSuccess);
Ok(())
}
}
Expand All @@ -378,8 +384,9 @@ where
/// Call this method when the discovery service learns the address of a peer.
pub fn set_routable(&mut self, peer_id: PeerId) {
self.provider_cache.set_routable(peer_id);
stats::MEMBERSHIP_ROUTABLE_PEERS
.set(self.provider_cache.num_routable().try_into().unwrap());

let num_routable = self.provider_cache.num_routable().try_into().unwrap();
emit(observe::MembershipEvent::RoutablePeers(num_routable));
self.publish_for_new_peer(peer_id);
}

Expand All @@ -406,33 +413,27 @@ where
if msg.topic == self.membership_topic.hash() {
match SignedProviderRecord::from_bytes(&msg.data).map(|r| r.into_record()) {
Ok(record) => self.handle_provider_record(record),
Err(e) => {
stats::MEMBERSHIP_INVALID_MESSAGE.inc();
warn!(
"Gossip message from peer {:?} could not be deserialized as ProviderRecord: {e}",
msg.source
);
}
Err(e) => emit(
observe::MembershipFailureEvent::GossipInvalidProviderRecord(
msg.source,
e.to_string(),
),
),
}
} else if self.voting_topics.contains(&msg.topic) {
match SignedVoteRecord::from_bytes(&msg.data).map(|r| r.into_record()) {
Ok(record) => self.handle_vote_record(record),
Err(e) => {
stats::MEMBERSHIP_INVALID_MESSAGE.inc();
warn!(
"Gossip message from peer {:?} could not be deserialized as VoteRecord: {e}",
msg.source
);
}
Err(e) => emit(observe::MembershipFailureEvent::GossipInvalidVoteRecord(
msg.source,
e.to_string(),
)),
}
} else if let Some(subnet_id) = self.preemptive_topics.get(&msg.topic) {
self.handle_preemptive_data(subnet_id.clone(), msg.data)
} else {
stats::MEMBERSHIP_UNKNOWN_TOPIC.inc();
warn!(
"unknown gossipsub topic in message from {:?}: {}",
msg.source, msg.topic
);
emit(observe::MembershipFailureEvent::GossipUnknownTopic(
msg.source, msg.topic,
));
}
}

Expand All @@ -444,7 +445,7 @@ where
debug!("received provider record: {record:?}");
let (event, publish) = match self.provider_cache.add_provider(&record) {
None => {
stats::MEMBERSHIP_SKIPPED_PEERS.inc();
emit(observe::MembershipEvent::Skipped(record.peer_id));
(Some(Event::Skipped(record.peer_id)), false)
}
Some(d) if d.is_empty() && !d.is_new => (None, false),
Expand All @@ -459,7 +460,7 @@ where
}

if publish {
stats::MEMBERSHIP_PROVIDER_PEERS.inc();
emit(observe::MembershipEvent::Added(record.peer_id));
self.publish_for_new_peer(record.peer_id)
}
}
Expand Down Expand Up @@ -514,7 +515,7 @@ where
let cutoff_timestamp = Timestamp::now() - self.max_provider_age;
let pruned = self.provider_cache.prune_providers(cutoff_timestamp);
for peer_id in pruned {
stats::MEMBERSHIP_PROVIDER_PEERS.dec();
emit(observe::MembershipEvent::Removed(peer_id));
self.outbox.push_back(Event::Removed(peer_id))
}
}
Expand Down
2 changes: 1 addition & 1 deletion ipld/resolver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ mod behaviour;
mod client;
mod hash;
mod limiter;
mod observe;
mod service;
mod stats;
mod timestamp;

mod provider_cache;
Expand Down
Loading

0 comments on commit 7572a6a

Please sign in to comment.