From 2d83725c096ce7d895100bc5897359bb9660bea7 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 15 May 2023 04:58:06 +0200 Subject: [PATCH] feat(metrics)!: expose identify metrics for connected peers only Previously we would increase a counter / gauge / histogram on each received identify information. These metrics are missleading, as e.g. they depend on the identify interval and don't represent the set of currently connected peers. With this commit, identify information is tracked for the currently connected peers only. Instead of an increase on each received identify information, metrics represent the status quo (Gauge). Example: ``` \# HELP libp2p_libp2p_identify_remote_protocols Number of connected nodes supporting a specific protocol, with "unrecognized" for each peer supporting one or more unrecognized protocols... \# TYPE libp2p_libp2p_identify_remote_protocols gauge libp2p_libp2p_identify_remote_protocols_total{protocol="/ipfs/id/push/1.0.0"} 1 libp2p_libp2p_identify_remote_protocols_total{protocol="/ipfs/id/1.0.0"} 1 libp2p_libp2p_identify_remote_protocols_total{protocol="/ipfs/ping/1.0.0"} 1 libp2p_libp2p_identify_remote_protocols_total{protocol="unrecognized"} 1 \# HELP libp2p_libp2p_identify_remote_listen_addresses Number of connected nodes advertising a specific listen address... \# TYPE libp2p_libp2p_identify_remote_listen_addresses gauge libp2p_libp2p_identify_remote_listen_addresses_total{listen_address="/ip4/tcp"} 1 libp2p_libp2p_identify_remote_listen_addresses_total{listen_address="/ip4/udp/quic"} 1 \# HELP libp2p_libp2p_identify_local_observed_addresses Number of connected nodes observing the local node at a specific address... \# TYPE libp2p_libp2p_identify_local_observed_addresses gauge libp2p_libp2p_identify_local_observed_addresses_total{observed_address="/ip4/tcp"} 1 ``` Pull-Request: #3325. --- Cargo.lock | 5 +- examples/metrics/Cargo.toml | 2 +- examples/metrics/src/http_service.rs | 2 +- misc/metrics/CHANGELOG.md | 20 ++ misc/metrics/Cargo.toml | 3 +- misc/metrics/src/identify.rs | 291 ++++++++++++++------------- protocols/gossipsub/Cargo.toml | 2 +- 7 files changed, 180 insertions(+), 145 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 73a76d54dfe..b0380178273 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2732,6 +2732,7 @@ dependencies = [ "libp2p-ping", "libp2p-relay", "libp2p-swarm", + "once_cell", "prometheus-client", ] @@ -4019,9 +4020,9 @@ dependencies = [ [[package]] name = "prometheus-client" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e227aeb6c2cfec819e999c4773b35f8c7fa37298a203ff46420095458eee567e" +checksum = "38974b1966bd5b6c7c823a20c1e07d5b84b171db20bac601e9b529720f7299f8" dependencies = [ "dtoa", "itoa", diff --git a/examples/metrics/Cargo.toml b/examples/metrics/Cargo.toml index ba08cdaebe7..d170cc2f28d 100644 --- a/examples/metrics/Cargo.toml +++ b/examples/metrics/Cargo.toml @@ -12,4 +12,4 @@ hyper = { version = "0.14", features = ["server", "tcp", "http1"] } libp2p = { path = "../../libp2p", features = ["async-std", "metrics", "ping", "noise", "identify", "tcp", "yamux", "macros"] } log = "0.4.0" tokio = { version = "1", features = ["rt-multi-thread"] } -prometheus-client = "0.20.0" +prometheus-client = "0.21.0" diff --git a/examples/metrics/src/http_service.rs b/examples/metrics/src/http_service.rs index 84102c2b558..46cb7aacb84 100644 --- a/examples/metrics/src/http_service.rs +++ b/examples/metrics/src/http_service.rs @@ -33,7 +33,7 @@ const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;v pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Error> { // Serve on localhost. - let addr = ([127, 0, 0, 1], 0).into(); + let addr = ([127, 0, 0, 1], 8080).into(); // Use the tokio runtime to run the hyper server. let rt = tokio::runtime::Runtime::new()?; diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index 4c653ca0051..ca090d60171 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -1,9 +1,29 @@ ## 0.13.0 - unreleased +- Previously `libp2p-metrics::identify` would increase a counter / gauge / histogram on each + received identify information. These metrics are misleading, as e.g. they depend on the identify + interval and don't represent the set of currently connected peers. With this change, identify + information is tracked for the currently connected peers only. Instead of an increase on each + received identify information, metrics represent the status quo (Gauge). + + Metrics removed: + - `libp2p_identify_protocols` + - `libp2p_identify_received_info_listen_addrs` + - `libp2p_identify_received_info_protocols` + - `libp2p_identify_listen_addresses` + + Metrics added: + - `libp2p_identify_remote_protocols` + - `libp2p_identify_remote_listen_addresses` + - `libp2p_identify_local_observed_addresses` + + See [PR 3325]. + - Raise MSRV to 1.65. See [PR 3715]. [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 +[PR 3325]: https://github.com/libp2p/rust-libp2p/pull/3325 ## 0.12.0 diff --git a/misc/metrics/Cargo.toml b/misc/metrics/Cargo.toml index 9fafe680115..78cb5b3fa2c 100644 --- a/misc/metrics/Cargo.toml +++ b/misc/metrics/Cargo.toml @@ -27,7 +27,8 @@ libp2p-ping = { workspace = true, optional = true } libp2p-relay = { workspace = true, optional = true } libp2p-swarm = { workspace = true } libp2p-identity = { workspace = true } -prometheus-client = "0.20.0" +prometheus-client = { version = "0.21.0" } +once_cell = "1.16.0" [target.'cfg(not(target_os = "unknown"))'.dependencies] libp2p-gossipsub = { workspace = true, optional = true } diff --git a/misc/metrics/src/identify.rs b/misc/metrics/src/identify.rs index ffd0cdb9fc2..e3e147062b3 100644 --- a/misc/metrics/src/identify.rs +++ b/misc/metrics/src/identify.rs @@ -21,39 +21,78 @@ use crate::protocol_stack; use libp2p_identity::PeerId; use libp2p_swarm::StreamProtocol; -use prometheus_client::encoding::{EncodeLabelSet, EncodeMetric, MetricEncoder}; +use once_cell::sync::Lazy; +use prometheus_client::collector::Collector; +use prometheus_client::encoding::EncodeLabelSet; use prometheus_client::metrics::counter::Counter; -use prometheus_client::metrics::family::Family; -use prometheus_client::metrics::histogram::{exponential_buckets, Histogram}; -use prometheus_client::metrics::MetricType; -use prometheus_client::registry::Registry; +use prometheus_client::metrics::family::ConstFamily; +use prometheus_client::metrics::gauge::ConstGauge; +use prometheus_client::registry::{Descriptor, LocalMetric, Registry}; +use prometheus_client::MaybeOwned; +use std::borrow::Cow; use std::collections::HashMap; -use std::iter; use std::sync::{Arc, Mutex}; +static PROTOCOLS_DESCRIPTOR: Lazy = Lazy::new(|| { + Descriptor::new( + "remote_protocols", + r#"Number of connected nodes supporting a specific protocol, with "unrecognized" for each + peer supporting one or more unrecognized protocols"#, + None, + None, + vec![], + ) +}); +static LISTEN_ADDRESSES_DESCRIPTOR: Lazy = Lazy::new(|| { + Descriptor::new( + "remote_listen_addresses", + "Number of connected nodes advertising a specific listen address", + None, + None, + vec![], + ) +}); +static OBSERVED_ADDRESSES_DESCRIPTOR: Lazy = Lazy::new(|| { + Descriptor::new( + "local_observed_addresses", + "Number of connected nodes observing the local node at a specific address", + None, + None, + vec![], + ) +}); +const ALLOWED_PROTOCOLS: &[StreamProtocol] = &[ + #[cfg(feature = "dcutr")] + libp2p_dcutr::PROTOCOL_NAME, + // #[cfg(feature = "gossipsub")] + // #[cfg(not(target_os = "unknown"))] + // TODO: Add Gossipsub protocol name + libp2p_identify::PROTOCOL_NAME, + libp2p_identify::PUSH_PROTOCOL_NAME, + #[cfg(feature = "kad")] + libp2p_kad::PROTOCOL_NAME, + #[cfg(feature = "ping")] + libp2p_ping::PROTOCOL_NAME, + #[cfg(feature = "relay")] + libp2p_relay::STOP_PROTOCOL_NAME, + #[cfg(feature = "relay")] + libp2p_relay::HOP_PROTOCOL_NAME, +]; + pub(crate) struct Metrics { - protocols: Protocols, + peers: Peers, error: Counter, pushed: Counter, received: Counter, - received_info_listen_addrs: Histogram, - received_info_protocols: Histogram, sent: Counter, - listen_addresses: Family, } impl Metrics { pub(crate) fn new(registry: &mut Registry) -> Self { let sub_registry = registry.sub_registry_with_prefix("identify"); - let protocols = Protocols::default(); - sub_registry.register( - "protocols", - "Number of connected nodes supporting a specific protocol, with \ - \"unrecognized\" for each peer supporting one or more unrecognized \ - protocols", - protocols.clone(), - ); + let peers = Peers::default(); + sub_registry.register_collector(Box::new(peers.clone())); let error = Counter::default(); sub_registry.register( @@ -78,24 +117,6 @@ impl Metrics { received.clone(), ); - let received_info_listen_addrs = - Histogram::new(iter::once(0.0).chain(exponential_buckets(1.0, 2.0, 9))); - sub_registry.register( - "received_info_listen_addrs", - "Number of listen addresses for remote peer received in \ - identification information", - received_info_listen_addrs.clone(), - ); - - let received_info_protocols = - Histogram::new(iter::once(0.0).chain(exponential_buckets(1.0, 2.0, 9))); - sub_registry.register( - "received_info_protocols", - "Number of protocols supported by the remote peer received in \ - identification information", - received_info_protocols.clone(), - ); - let sent = Counter::default(); sub_registry.register( "sent", @@ -104,22 +125,12 @@ impl Metrics { sent.clone(), ); - let listen_addresses = Family::default(); - sub_registry.register( - "listen_addresses", - "Number of listen addresses for remote peer per protocol stack", - listen_addresses.clone(), - ); - Self { - protocols, + peers, error, pushed, received, - received_info_listen_addrs, - received_info_protocols, sent, - listen_addresses, } } } @@ -134,58 +145,8 @@ impl super::Recorder for Metrics { self.pushed.inc(); } libp2p_identify::Event::Received { peer_id, info, .. } => { - { - let mut protocols = info - .protocols - .iter() - .filter(|p| { - let allowed_protocols: &[StreamProtocol] = &[ - #[cfg(feature = "dcutr")] - libp2p_dcutr::PROTOCOL_NAME, - // #[cfg(feature = "gossipsub")] - // #[cfg(not(target_os = "unknown"))] - // TODO: Add Gossipsub protocol name - libp2p_identify::PROTOCOL_NAME, - libp2p_identify::PUSH_PROTOCOL_NAME, - #[cfg(feature = "kad")] - libp2p_kad::PROTOCOL_NAME, - #[cfg(feature = "ping")] - libp2p_ping::PROTOCOL_NAME, - #[cfg(feature = "relay")] - libp2p_relay::STOP_PROTOCOL_NAME, - #[cfg(feature = "relay")] - libp2p_relay::HOP_PROTOCOL_NAME, - ]; - - allowed_protocols.contains(p) - }) - .map(|p| p.to_string()) - .collect::>(); - - // Signal via an additional label value that one or more - // protocols of the remote peer have not been recognized. - if protocols.len() < info.protocols.len() { - protocols.push("unrecognized".to_string()); - } - - protocols.sort_unstable(); - protocols.dedup(); - - self.protocols.add(*peer_id, protocols); - } - self.received.inc(); - self.received_info_protocols - .observe(info.protocols.len() as f64); - self.received_info_listen_addrs - .observe(info.listen_addrs.len() as f64); - for listen_addr in &info.listen_addrs { - self.listen_addresses - .get_or_create(&AddressLabels { - protocols: protocol_stack::as_string(listen_addr), - }) - .inc(); - } + self.peers.record(*peer_id, info.clone()); } libp2p_identify::Event::Sent { .. } => { self.sent.inc(); @@ -203,7 +164,7 @@ impl super::Recorder>>>, -} +#[derive(Default, Debug, Clone)] +struct Peers(Arc>>); -impl Protocols { - fn add(&self, peer: PeerId, protocols: Vec) { - self.peers - .lock() - .expect("Lock not to be poisoned") - .insert(peer, protocols); +impl Peers { + fn record(&self, peer_id: PeerId, info: libp2p_identify::Info) { + self.0.lock().unwrap().insert(peer_id, info); } - fn remove(&self, peer: PeerId) { - self.peers - .lock() - .expect("Lock not to be poisoned") - .remove(&peer); + fn remove(&self, peer_id: PeerId) { + self.0.lock().unwrap().remove(&peer_id); } } -impl EncodeMetric for Protocols { - fn encode(&self, mut encoder: MetricEncoder) -> Result<(), std::fmt::Error> { - let count_by_protocol = self - .peers - .lock() - .expect("Lock not to be poisoned") - .iter() - .fold( - HashMap::::default(), - |mut acc, (_, protocols)| { - for protocol in protocols { - let count = acc.entry(protocol.to_string()).or_default(); - *count += 1; - } - acc - }, - ); +impl Collector for Peers { + fn collect<'a>( + &'a self, + ) -> Box, MaybeOwned<'a, Box>)> + 'a> + { + let mut count_by_protocols: HashMap = Default::default(); + let mut count_by_listen_addresses: HashMap = Default::default(); + let mut count_by_observed_addresses: HashMap = Default::default(); + + for (_, peer_info) in self.0.lock().unwrap().iter() { + { + let mut protocols: Vec<_> = peer_info + .protocols + .iter() + .map(|p| { + if ALLOWED_PROTOCOLS.contains(&p) { + p.to_string() + } else { + "unrecognized".to_string() + } + }) + .collect(); + protocols.sort(); + protocols.dedup(); + + for protocol in protocols.into_iter() { + let count = count_by_protocols.entry(protocol).or_default(); + *count += 1; + } + } + + { + let mut addrs: Vec<_> = peer_info + .listen_addrs + .iter() + .map(protocol_stack::as_string) + .collect(); + addrs.sort(); + addrs.dedup(); + + for addr in addrs { + let count = count_by_listen_addresses.entry(addr).or_default(); + *count += 1; + } + } - for (protocol, count) in count_by_protocol { - encoder - .encode_family(&[("protocol", protocol)])? - .encode_gauge(&count)?; + { + let count = count_by_observed_addresses + .entry(protocol_stack::as_string(&peer_info.observed_addr)) + .or_default(); + *count += 1; + } } - Ok(()) - } + let count_by_protocols: Box = + Box::new(ConstFamily::new(count_by_protocols.into_iter().map( + |(protocol, count)| ([("protocol", protocol)], ConstGauge::new(count)), + ))); + + let count_by_listen_addresses: Box = + Box::new(ConstFamily::new(count_by_listen_addresses.into_iter().map( + |(protocol, count)| ([("listen_address", protocol)], ConstGauge::new(count)), + ))); + + let count_by_observed_addresses: Box = Box::new(ConstFamily::new( + count_by_observed_addresses + .into_iter() + .map(|(protocol, count)| { + ([("observed_address", protocol)], ConstGauge::new(count)) + }), + )); - fn metric_type(&self) -> MetricType { - MetricType::Gauge + Box::new( + [ + ( + Cow::Borrowed(&*PROTOCOLS_DESCRIPTOR), + MaybeOwned::Owned(count_by_protocols), + ), + ( + Cow::Borrowed(&*LISTEN_ADDRESSES_DESCRIPTOR), + MaybeOwned::Owned(count_by_listen_addresses), + ), + ( + Cow::Borrowed(&*OBSERVED_ADDRESSES_DESCRIPTOR), + MaybeOwned::Owned(count_by_observed_addresses), + ), + ] + .into_iter(), + ) } } diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 9cc46dc3664..813428bc008 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -35,7 +35,7 @@ wasm-timer = "0.2.5" instant = "0.1.11" void = "1.0.2" # Metrics dependencies -prometheus-client = "0.20.0" +prometheus-client = "0.21.0" [dev-dependencies] async-std = { version = "1.6.3", features = ["unstable"] }