From 952962c03192cf4540485d38037c26a5afb52e21 Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Tue, 10 Sep 2024 15:02:01 +0800 Subject: [PATCH] refactor: replace `RdKafkaStats` with Guarded metrics (#18462) Signed-off-by: tabVersion --- src/common/metrics/src/guarded_metrics.rs | 18 + src/common/src/lib.rs | 2 +- src/connector/src/source/kafka/stats.rs | 563 ++++++++++++---------- 3 files changed, 323 insertions(+), 260 deletions(-) diff --git a/src/common/metrics/src/guarded_metrics.rs b/src/common/metrics/src/guarded_metrics.rs index 27710748ae359..9b16cc778938c 100644 --- a/src/common/metrics/src/guarded_metrics.rs +++ b/src/common/metrics/src/guarded_metrics.rs @@ -83,6 +83,22 @@ macro_rules! register_guarded_int_gauge_vec_with_registry { }}; } +#[macro_export] +macro_rules! register_guarded_uint_gauge_vec_with_registry { + ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{ + let inner = prometheus::core::GenericGaugeVec::::new( + prometheus::opts!($NAME, $HELP), + $LABELS_NAMES, + ); + inner.and_then(|inner| { + let inner = $crate::__extract_gauge_builder(inner); + let label_guarded = $crate::LabelGuardedUintGaugeVec::new(inner, { $LABELS_NAMES }); + let result = ($REGISTRY).register(Box::new(label_guarded.clone())); + result.map(move |()| label_guarded) + }) + }}; +} + #[macro_export] macro_rules! register_guarded_int_counter_vec_with_registry { ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{ @@ -131,6 +147,8 @@ pub type LabelGuardedIntCounterVec = LabelGuardedMetricVec, N>; pub type LabelGuardedIntGaugeVec = LabelGuardedMetricVec, N>; +pub type LabelGuardedUintGaugeVec = + LabelGuardedMetricVec, N>; pub type LabelGuardedGaugeVec = LabelGuardedMetricVec, N>; diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 8d47d0c621646..7751eb591239d 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -76,7 +76,7 @@ pub mod memory; pub use risingwave_common_metrics::{ monitor, register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry, - register_guarded_int_gauge_vec_with_registry, + register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry, }; pub use { risingwave_common_metrics as metrics, risingwave_common_secret as secret, diff --git a/src/connector/src/source/kafka/stats.rs b/src/connector/src/source/kafka/stats.rs index 679f5c24bd2a1..7a36c4d1fffea 100644 --- a/src/connector/src/source/kafka/stats.rs +++ b/src/connector/src/source/kafka/stats.rs @@ -12,34 +12,37 @@ // See the License for the specific language governing permissions and // limitations under the License. -use prometheus::core::{AtomicU64, GenericGaugeVec}; -use prometheus::{register_int_gauge_vec_with_registry, IntGaugeVec, Registry}; +use prometheus::core::AtomicU64; +use prometheus::Registry; use rdkafka::statistics::{Broker, ConsumerGroup, Partition, Topic, Window}; use rdkafka::Statistics; -use risingwave_common::metrics::register_uint_gauge_vec_with_registry; +use risingwave_common::metrics::{LabelGuardedIntGaugeVec, LabelGuardedUintGaugeVec}; +use risingwave_common::{ + register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry, +}; #[derive(Debug, Clone)] pub struct RdKafkaStats { pub registry: Registry, - pub ts: IntGaugeVec, - pub time: IntGaugeVec, - pub age: IntGaugeVec, - pub replyq: IntGaugeVec, - pub msg_cnt: GenericGaugeVec, - pub msg_size: GenericGaugeVec, - pub msg_max: GenericGaugeVec, - pub msg_size_max: GenericGaugeVec, - pub tx: IntGaugeVec, - pub tx_bytes: IntGaugeVec, - pub rx: IntGaugeVec, - pub rx_bytes: IntGaugeVec, - pub tx_msgs: IntGaugeVec, - pub tx_msgs_bytes: IntGaugeVec, - pub rx_msgs: IntGaugeVec, - pub rx_msgs_bytes: IntGaugeVec, - pub simple_cnt: IntGaugeVec, - pub metadata_cache_cnt: IntGaugeVec, + pub ts: LabelGuardedIntGaugeVec<2>, + pub time: LabelGuardedIntGaugeVec<2>, + pub age: LabelGuardedIntGaugeVec<2>, + pub replyq: LabelGuardedIntGaugeVec<2>, + pub msg_cnt: LabelGuardedUintGaugeVec<2>, + pub msg_size: LabelGuardedUintGaugeVec<2>, + pub msg_max: LabelGuardedUintGaugeVec<2>, + pub msg_size_max: LabelGuardedUintGaugeVec<2>, + pub tx: LabelGuardedIntGaugeVec<2>, + pub tx_bytes: LabelGuardedIntGaugeVec<2>, + pub rx: LabelGuardedIntGaugeVec<2>, + pub rx_bytes: LabelGuardedIntGaugeVec<2>, + pub tx_msgs: LabelGuardedIntGaugeVec<2>, + pub tx_msgs_bytes: LabelGuardedIntGaugeVec<2>, + pub rx_msgs: LabelGuardedIntGaugeVec<2>, + pub rx_msgs_bytes: LabelGuardedIntGaugeVec<2>, + pub simple_cnt: LabelGuardedIntGaugeVec<2>, + pub metadata_cache_cnt: LabelGuardedIntGaugeVec<2>, pub broker_stats: BrokerStats, pub topic_stats: TopicStats, @@ -50,29 +53,29 @@ pub struct RdKafkaStats { pub struct BrokerStats { pub registry: Registry, - pub state_age: IntGaugeVec, - pub outbuf_cnt: IntGaugeVec, - pub outbuf_msg_cnt: IntGaugeVec, - pub waitresp_cnt: IntGaugeVec, - pub waitresp_msg_cnt: IntGaugeVec, - pub tx: GenericGaugeVec, - pub tx_bytes: GenericGaugeVec, - pub tx_errs: GenericGaugeVec, - pub tx_retries: GenericGaugeVec, - pub tx_idle: IntGaugeVec, - pub req_timeouts: GenericGaugeVec, - pub rx: GenericGaugeVec, - pub rx_bytes: GenericGaugeVec, - pub rx_errs: GenericGaugeVec, - pub rx_corriderrs: GenericGaugeVec, - pub rx_partial: GenericGaugeVec, - pub rx_idle: IntGaugeVec, - pub req: IntGaugeVec, - pub zbuf_grow: GenericGaugeVec, - pub buf_grow: GenericGaugeVec, - pub wakeups: GenericGaugeVec, - pub connects: IntGaugeVec, - pub disconnects: IntGaugeVec, + pub state_age: LabelGuardedIntGaugeVec<4>, + pub outbuf_cnt: LabelGuardedIntGaugeVec<4>, + pub outbuf_msg_cnt: LabelGuardedIntGaugeVec<4>, + pub waitresp_cnt: LabelGuardedIntGaugeVec<4>, + pub waitresp_msg_cnt: LabelGuardedIntGaugeVec<4>, + pub tx: LabelGuardedUintGaugeVec<4>, + pub tx_bytes: LabelGuardedUintGaugeVec<4>, + pub tx_errs: LabelGuardedUintGaugeVec<4>, + pub tx_retries: LabelGuardedUintGaugeVec<4>, + pub tx_idle: LabelGuardedIntGaugeVec<4>, + pub req_timeouts: LabelGuardedUintGaugeVec<4>, + pub rx: LabelGuardedUintGaugeVec<4>, + pub rx_bytes: LabelGuardedUintGaugeVec<4>, + pub rx_errs: LabelGuardedUintGaugeVec<4>, + pub rx_corriderrs: LabelGuardedUintGaugeVec<4>, + pub rx_partial: LabelGuardedUintGaugeVec<4>, + pub rx_idle: LabelGuardedIntGaugeVec<4>, + pub req: LabelGuardedIntGaugeVec<5>, + pub zbuf_grow: LabelGuardedUintGaugeVec<4>, + pub buf_grow: LabelGuardedUintGaugeVec<4>, + pub wakeups: LabelGuardedUintGaugeVec<4>, + pub connects: LabelGuardedIntGaugeVec<4>, + pub disconnects: LabelGuardedIntGaugeVec<4>, pub int_latency: StatsWindow, pub outbuf_latency: StatsWindow, pub rtt: StatsWindow, @@ -83,7 +86,7 @@ pub struct BrokerStats { pub struct TopicStats { pub registry: Registry, - pub metadata_age: IntGaugeVec, + pub metadata_age: LabelGuardedIntGaugeVec<3>, pub batch_size: StatsWindow, pub batch_cnt: StatsWindow, pub partitions: PartitionStats, @@ -93,58 +96,58 @@ pub struct TopicStats { pub struct StatsWindow { pub registry: Registry, - pub min: IntGaugeVec, - pub max: IntGaugeVec, - pub avg: IntGaugeVec, - pub sum: IntGaugeVec, - pub cnt: IntGaugeVec, - pub stddev: IntGaugeVec, - pub hdr_size: IntGaugeVec, - pub p50: IntGaugeVec, - pub p75: IntGaugeVec, - pub p90: IntGaugeVec, - pub p95: IntGaugeVec, - pub p99: IntGaugeVec, - pub p99_99: IntGaugeVec, - pub out_of_range: IntGaugeVec, + pub min: LabelGuardedIntGaugeVec<4>, + pub max: LabelGuardedIntGaugeVec<4>, + pub avg: LabelGuardedIntGaugeVec<4>, + pub sum: LabelGuardedIntGaugeVec<4>, + pub cnt: LabelGuardedIntGaugeVec<4>, + pub stddev: LabelGuardedIntGaugeVec<4>, + pub hdr_size: LabelGuardedIntGaugeVec<4>, + pub p50: LabelGuardedIntGaugeVec<4>, + pub p75: LabelGuardedIntGaugeVec<4>, + pub p90: LabelGuardedIntGaugeVec<4>, + pub p95: LabelGuardedIntGaugeVec<4>, + pub p99: LabelGuardedIntGaugeVec<4>, + pub p99_99: LabelGuardedIntGaugeVec<4>, + pub out_of_range: LabelGuardedIntGaugeVec<4>, } #[derive(Debug, Clone)] pub struct ConsumerGroupStats { pub registry: Registry, - pub state_age: IntGaugeVec, + pub state_age: LabelGuardedIntGaugeVec<3>, // todo: (do not know value set) join_state: IntGaugeVec, - pub rebalance_age: IntGaugeVec, - pub rebalance_cnt: IntGaugeVec, + pub rebalance_age: LabelGuardedIntGaugeVec<3>, + pub rebalance_cnt: LabelGuardedIntGaugeVec<3>, // todo: (cannot handle string) rebalance_reason, - pub assignment_size: IntGaugeVec, + pub assignment_size: LabelGuardedIntGaugeVec<3>, } impl ConsumerGroupStats { pub fn new(registry: Registry) -> Self { - let state_age = register_int_gauge_vec_with_registry!( + let state_age = register_guarded_int_gauge_vec_with_registry!( "rdkafka_consumer_group_state_age", "Age of the consumer group state in seconds", &["id", "client_id", "state"], registry ) .unwrap(); - let rebalance_age = register_int_gauge_vec_with_registry!( + let rebalance_age = register_guarded_int_gauge_vec_with_registry!( "rdkafka_consumer_group_rebalance_age", "Age of the last rebalance in seconds", &["id", "client_id", "state"], registry ) .unwrap(); - let rebalance_cnt = register_int_gauge_vec_with_registry!( + let rebalance_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_consumer_group_rebalance_cnt", "Number of rebalances", &["id", "client_id", "state"], registry ) .unwrap(); - let assignment_size = register_int_gauge_vec_with_registry!( + let assignment_size = register_guarded_int_gauge_vec_with_registry!( "rdkafka_consumer_group_assignment_size", "Number of assigned partitions", &["id", "client_id", "state"], @@ -164,16 +167,16 @@ impl ConsumerGroupStats { pub fn report(&self, id: &str, client_id: &str, stats: &ConsumerGroup) { let state = stats.state.as_str(); self.state_age - .with_label_values(&[id, client_id, state]) + .with_guarded_label_values(&[id, client_id, state]) .set(stats.stateage); self.rebalance_age - .with_label_values(&[id, client_id, state]) + .with_guarded_label_values(&[id, client_id, state]) .set(stats.rebalance_age); self.rebalance_cnt - .with_label_values(&[id, client_id, state]) + .with_guarded_label_values(&[id, client_id, state]) .set(stats.rebalance_cnt); self.assignment_size - .with_label_values(&[id, client_id, state]) + .with_guarded_label_values(&[id, client_id, state]) .set(stats.assignment_size as i64); } } @@ -181,98 +184,98 @@ impl ConsumerGroupStats { impl StatsWindow { pub fn new(registry: Registry, path: &str) -> Self { let get_metric_name = |name: &str| format!("rdkafka_{}_{}", path, name); - let min = register_int_gauge_vec_with_registry!( + let min = register_guarded_int_gauge_vec_with_registry!( get_metric_name("min"), "Minimum value", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let max = register_int_gauge_vec_with_registry!( + let max = register_guarded_int_gauge_vec_with_registry!( get_metric_name("max"), "Maximum value", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let avg = register_int_gauge_vec_with_registry!( + let avg = register_guarded_int_gauge_vec_with_registry!( get_metric_name("avg"), "Average value", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let sum = register_int_gauge_vec_with_registry!( + let sum = register_guarded_int_gauge_vec_with_registry!( get_metric_name("sum"), "Sum of values", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let cnt = register_int_gauge_vec_with_registry!( + let cnt = register_guarded_int_gauge_vec_with_registry!( get_metric_name("cnt"), "Count of values", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let stddev = register_int_gauge_vec_with_registry!( + let stddev = register_guarded_int_gauge_vec_with_registry!( get_metric_name("stddev"), "Standard deviation", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let hdr_size = register_int_gauge_vec_with_registry!( + let hdr_size = register_guarded_int_gauge_vec_with_registry!( get_metric_name("hdrsize"), "Size of the histogram header", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let p50 = register_int_gauge_vec_with_registry!( + let p50 = register_guarded_int_gauge_vec_with_registry!( get_metric_name("p50"), "50th percentile", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let p75 = register_int_gauge_vec_with_registry!( + let p75 = register_guarded_int_gauge_vec_with_registry!( get_metric_name("p75"), "75th percentile", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let p90 = register_int_gauge_vec_with_registry!( + let p90 = register_guarded_int_gauge_vec_with_registry!( get_metric_name("p90"), "90th percentile", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let p95 = register_int_gauge_vec_with_registry!( + let p95 = register_guarded_int_gauge_vec_with_registry!( get_metric_name("p95"), "95th percentile", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let p99 = register_int_gauge_vec_with_registry!( + let p99 = register_guarded_int_gauge_vec_with_registry!( get_metric_name("p99"), "99th percentile", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let p99_99 = register_int_gauge_vec_with_registry!( + let p99_99 = register_guarded_int_gauge_vec_with_registry!( get_metric_name("p99_99"), "99.99th percentile", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let out_of_range = register_int_gauge_vec_with_registry!( + let out_of_range = register_guarded_int_gauge_vec_with_registry!( get_metric_name("out_of_range"), "Out of range values", &["id", "client_id", "broker", "topic"], @@ -302,26 +305,32 @@ impl StatsWindow { pub fn report(&self, id: &str, client_id: &str, broker: &str, topic: &str, stats: &Window) { let labels = [id, client_id, broker, topic]; - self.min.with_label_values(&labels).set(stats.min); - self.max.with_label_values(&labels).set(stats.max); - self.avg.with_label_values(&labels).set(stats.avg); - self.sum.with_label_values(&labels).set(stats.sum); - self.cnt.with_label_values(&labels).set(stats.cnt); - self.stddev.with_label_values(&labels).set(stats.stddev); - self.hdr_size.with_label_values(&labels).set(stats.hdrsize); - self.p50.with_label_values(&labels).set(stats.p50); - self.p75.with_label_values(&labels).set(stats.p75); - self.p90.with_label_values(&labels).set(stats.p90); - self.p99_99.with_label_values(&labels).set(stats.p99_99); + self.min.with_guarded_label_values(&labels).set(stats.min); + self.max.with_guarded_label_values(&labels).set(stats.max); + self.avg.with_guarded_label_values(&labels).set(stats.avg); + self.sum.with_guarded_label_values(&labels).set(stats.sum); + self.cnt.with_guarded_label_values(&labels).set(stats.cnt); + self.stddev + .with_guarded_label_values(&labels) + .set(stats.stddev); + self.hdr_size + .with_guarded_label_values(&labels) + .set(stats.hdrsize); + self.p50.with_guarded_label_values(&labels).set(stats.p50); + self.p75.with_guarded_label_values(&labels).set(stats.p75); + self.p90.with_guarded_label_values(&labels).set(stats.p90); + self.p99_99 + .with_guarded_label_values(&labels) + .set(stats.p99_99); self.out_of_range - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.outofrange); } } impl TopicStats { pub fn new(registry: Registry) -> Self { - let metadata_age = register_int_gauge_vec_with_registry!( + let metadata_age = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_metadata_age", "Age of the topic metadata in milliseconds", &["id", "client_id", "topic"], @@ -348,7 +357,7 @@ impl TopicStats { fn report_inner(&self, id: &str, client_id: &str, topic: &str, stats: &Topic) { self.metadata_age - .with_label_values(&[id, client_id, topic]) + .with_guarded_label_values(&[id, client_id, topic]) .set(stats.metadata_age); self.batch_size .report(id, client_id, "", topic, &stats.batchsize); @@ -362,212 +371,212 @@ impl TopicStats { pub struct PartitionStats { pub registry: Registry, - pub msgq_cnt: IntGaugeVec, - pub msgq_bytes: GenericGaugeVec, - pub xmit_msgq_cnt: IntGaugeVec, - pub xmit_msgq_bytes: GenericGaugeVec, - pub fetchq_cnt: IntGaugeVec, - pub fetchq_size: GenericGaugeVec, - pub query_offset: IntGaugeVec, - pub next_offset: IntGaugeVec, - pub app_offset: IntGaugeVec, - pub stored_offset: IntGaugeVec, - pub committed_offset: IntGaugeVec, - pub eof_offset: IntGaugeVec, - pub lo_offset: IntGaugeVec, - pub hi_offset: IntGaugeVec, - pub consumer_lag: IntGaugeVec, - pub consumer_lag_store: IntGaugeVec, - pub txmsgs: GenericGaugeVec, - pub txbytes: GenericGaugeVec, - pub rxmsgs: GenericGaugeVec, - pub rxbytes: GenericGaugeVec, - pub msgs: GenericGaugeVec, - pub rx_ver_drops: GenericGaugeVec, - pub msgs_inflight: IntGaugeVec, - pub next_ack_seq: IntGaugeVec, - pub next_err_seq: IntGaugeVec, - pub acked_msgid: GenericGaugeVec, + pub msgq_cnt: LabelGuardedIntGaugeVec<4>, + pub msgq_bytes: LabelGuardedUintGaugeVec<4>, + pub xmit_msgq_cnt: LabelGuardedIntGaugeVec<4>, + pub xmit_msgq_bytes: LabelGuardedUintGaugeVec<4>, + pub fetchq_cnt: LabelGuardedIntGaugeVec<4>, + pub fetchq_size: LabelGuardedUintGaugeVec<4>, + pub query_offset: LabelGuardedIntGaugeVec<4>, + pub next_offset: LabelGuardedIntGaugeVec<4>, + pub app_offset: LabelGuardedIntGaugeVec<4>, + pub stored_offset: LabelGuardedIntGaugeVec<4>, + pub committed_offset: LabelGuardedIntGaugeVec<4>, + pub eof_offset: LabelGuardedIntGaugeVec<4>, + pub lo_offset: LabelGuardedIntGaugeVec<4>, + pub hi_offset: LabelGuardedIntGaugeVec<4>, + pub consumer_lag: LabelGuardedIntGaugeVec<4>, + pub consumer_lag_store: LabelGuardedIntGaugeVec<4>, + pub txmsgs: LabelGuardedUintGaugeVec<4>, + pub txbytes: LabelGuardedUintGaugeVec<4>, + pub rxmsgs: LabelGuardedUintGaugeVec<4>, + pub rxbytes: LabelGuardedUintGaugeVec<4>, + pub msgs: LabelGuardedUintGaugeVec<4>, + pub rx_ver_drops: LabelGuardedUintGaugeVec<4>, + pub msgs_inflight: LabelGuardedIntGaugeVec<4>, + pub next_ack_seq: LabelGuardedIntGaugeVec<4>, + pub next_err_seq: LabelGuardedIntGaugeVec<4>, + pub acked_msgid: LabelGuardedUintGaugeVec<4>, } impl PartitionStats { pub fn new(registry: Registry) -> Self { - let msgq_cnt = register_int_gauge_vec_with_registry!( + let msgq_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_msgq_cnt", "Number of messages in the producer queue", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let msgq_bytes = register_uint_gauge_vec_with_registry!( + let msgq_bytes = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_topic_partition_msgq_bytes", "Size of messages in the producer queue", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let xmit_msgq_cnt = register_int_gauge_vec_with_registry!( + let xmit_msgq_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_xmit_msgq_cnt", "Number of messages in the transmit queue", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let xmit_msgq_bytes = register_uint_gauge_vec_with_registry!( + let xmit_msgq_bytes = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_topic_partition_xmit_msgq_bytes", "Size of messages in the transmit queue", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let fetchq_cnt = register_int_gauge_vec_with_registry!( + let fetchq_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_fetchq_cnt", "Number of messages in the fetch queue", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let fetchq_size = register_uint_gauge_vec_with_registry!( + let fetchq_size = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_topic_partition_fetchq_size", "Size of messages in the fetch queue", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let query_offset = register_int_gauge_vec_with_registry!( + let query_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_query_offset", "Current query offset", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let next_offset = register_int_gauge_vec_with_registry!( + let next_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_next_offset", "Next offset to query", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let app_offset = register_int_gauge_vec_with_registry!( + let app_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_app_offset", "Last acknowledged offset", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let stored_offset = register_int_gauge_vec_with_registry!( + let stored_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_stored_offset", "Last stored offset", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let committed_offset = register_int_gauge_vec_with_registry!( + let committed_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_committed_offset", "Last committed offset", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let eof_offset = register_int_gauge_vec_with_registry!( + let eof_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_eof_offset", "Last offset in broker log", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let lo_offset = register_int_gauge_vec_with_registry!( + let lo_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_lo_offset", "Low offset", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let hi_offset = register_int_gauge_vec_with_registry!( + let hi_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_hi_offset", "High offset", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let consumer_lag = register_int_gauge_vec_with_registry!( + let consumer_lag = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_consumer_lag", "Consumer lag", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let consumer_lag_store = register_int_gauge_vec_with_registry!( + let consumer_lag_store = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_consumer_lag_store", "Consumer lag stored", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let txmsgs = register_uint_gauge_vec_with_registry!( + let txmsgs = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_topic_partition_txmsgs", "Number of transmitted messages", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let txbytes = register_uint_gauge_vec_with_registry!( + let txbytes = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_topic_partition_txbytes", "Number of transmitted bytes", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let rxmsgs = register_uint_gauge_vec_with_registry!( + let rxmsgs = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_topic_partition_rxmsgs", "Number of received messages", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let rxbytes = register_uint_gauge_vec_with_registry!( + let rxbytes = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_topic_partition_rxbytes", "Number of received bytes", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let msgs = register_uint_gauge_vec_with_registry!( + let msgs = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_topic_partition_msgs", "Number of messages in partition", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let rx_ver_drops = register_uint_gauge_vec_with_registry!( + let rx_ver_drops = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_topic_partition_rx_ver_drops", "Number of received messages dropped due to version mismatch", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let msgs_inflight = register_int_gauge_vec_with_registry!( + let msgs_inflight = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_msgs_inflight", "Number of messages in-flight", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let next_ack_seq = register_int_gauge_vec_with_registry!( + let next_ack_seq = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_next_ack_seq", "Next ack sequence number", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let next_err_seq = register_int_gauge_vec_with_registry!( + let next_err_seq = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_next_err_seq", "Next error sequence number", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let acked_msgid = register_uint_gauge_vec_with_registry!( + let acked_msgid = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_topic_partition_acked_msgid", "Acknowledged message ID", &["id", "client_id", "topic", "partition"], @@ -615,78 +624,88 @@ impl PartitionStats { fn report_inner(&self, id: &str, client_id: &str, topic: &str, stats: &Partition) { let labels = [id, client_id, topic, &stats.partition.to_string()]; - self.msgq_cnt.with_label_values(&labels).set(stats.msgq_cnt); + self.msgq_cnt + .with_guarded_label_values(&labels) + .set(stats.msgq_cnt); self.msgq_bytes - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.msgq_bytes); self.xmit_msgq_cnt - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.xmit_msgq_cnt); self.xmit_msgq_bytes - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.xmit_msgq_bytes); self.fetchq_cnt - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.fetchq_cnt); self.fetchq_size - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.fetchq_size); self.query_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.query_offset); self.next_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.next_offset); self.app_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.app_offset); self.stored_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.stored_offset); self.committed_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.committed_offset); self.eof_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.eof_offset); self.lo_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.lo_offset); self.hi_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.hi_offset); self.consumer_lag - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.consumer_lag); self.consumer_lag_store - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.consumer_lag_stored); - self.txmsgs.with_label_values(&labels).set(stats.txmsgs); - self.txbytes.with_label_values(&labels).set(stats.txbytes); - self.rxmsgs.with_label_values(&labels).set(stats.rxmsgs); - self.rxbytes.with_label_values(&labels).set(stats.rxbytes); - self.msgs.with_label_values(&labels).set(stats.msgs); + self.txmsgs + .with_guarded_label_values(&labels) + .set(stats.txmsgs); + self.txbytes + .with_guarded_label_values(&labels) + .set(stats.txbytes); + self.rxmsgs + .with_guarded_label_values(&labels) + .set(stats.rxmsgs); + self.rxbytes + .with_guarded_label_values(&labels) + .set(stats.rxbytes); + self.msgs.with_guarded_label_values(&labels).set(stats.msgs); self.rx_ver_drops - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.rx_ver_drops); self.msgs_inflight - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.msgs_inflight); self.next_ack_seq - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.next_ack_seq); self.next_err_seq - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.next_err_seq); self.acked_msgid - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.acked_msgid); } } impl RdKafkaStats { pub fn new(registry: Registry) -> Self { - let ts = register_int_gauge_vec_with_registry!( + let ts = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_ts", "librdkafka's internal monotonic clock (microseconds)", // we cannot tell whether it is for consumer or producer, @@ -695,119 +714,119 @@ impl RdKafkaStats { registry ) .unwrap(); - let time = register_int_gauge_vec_with_registry!( + let time = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_time", "Wall clock time in seconds since the epoch", &["id", "client_id"], registry ) .unwrap(); - let age = register_int_gauge_vec_with_registry!( + let age = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_age", "Age of the topic metadata in milliseconds", &["id", "client_id"], registry ) .unwrap(); - let replyq = register_int_gauge_vec_with_registry!( + let replyq = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_replyq", "Number of replies waiting to be served", &["id", "client_id"], registry ) .unwrap(); - let msg_cnt = register_uint_gauge_vec_with_registry!( + let msg_cnt = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_top_msg_cnt", "Number of messages in all topics", &["id", "client_id"], registry ) .unwrap(); - let msg_size = register_uint_gauge_vec_with_registry!( + let msg_size = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_top_msg_size", "Size of messages in all topics", &["id", "client_id"], registry ) .unwrap(); - let msg_max = register_uint_gauge_vec_with_registry!( + let msg_max = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_top_msg_max", "Maximum message size in all topics", &["id", "client_id"], registry ) .unwrap(); - let msg_size_max = register_uint_gauge_vec_with_registry!( + let msg_size_max = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_top_msg_size_max", "Maximum message size in all topics", &["id", "client_id"], registry ) .unwrap(); - let tx = register_int_gauge_vec_with_registry!( + let tx = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_tx", "Number of transmitted messages", &["id", "client_id"], registry ) .unwrap(); - let tx_bytes = register_int_gauge_vec_with_registry!( + let tx_bytes = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_tx_bytes", "Number of transmitted bytes", &["id", "client_id"], registry ) .unwrap(); - let rx = register_int_gauge_vec_with_registry!( + let rx = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_rx", "Number of received messages", &["id", "client_id"], registry ) .unwrap(); - let rx_bytes = register_int_gauge_vec_with_registry!( + let rx_bytes = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_rx_bytes", "Number of received bytes", &["id", "client_id"], registry ) .unwrap(); - let tx_msgs = register_int_gauge_vec_with_registry!( + let tx_msgs = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_tx_msgs", "Number of transmitted messages", &["id", "client_id"], registry ) .unwrap(); - let tx_msgs_bytes = register_int_gauge_vec_with_registry!( + let tx_msgs_bytes = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_tx_msgs_bytes", "Number of transmitted bytes", &["id", "client_id"], registry ) .unwrap(); - let rx_msgs = register_int_gauge_vec_with_registry!( + let rx_msgs = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_rx_msgs", "Number of received messages", &["id", "client_id"], registry ) .unwrap(); - let rx_msgs_bytes = register_int_gauge_vec_with_registry!( + let rx_msgs_bytes = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_rx_msgs_bytes", "Number of received bytes", &["id", "client_id"], registry ) .unwrap(); - let simple_cnt = register_int_gauge_vec_with_registry!( + let simple_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_simple_cnt", "Number of simple consumer queues", &["id", "client_id"], registry ) .unwrap(); - let metadata_cache_cnt = register_int_gauge_vec_with_registry!( + let metadata_cache_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_metadata_cache_cnt", "Number of entries in the metadata cache", &["id", "client_id"], @@ -846,51 +865,59 @@ impl RdKafkaStats { pub fn report(&self, id: &str, stats: &Statistics) { let client_id = stats.name.as_str(); - self.ts.with_label_values(&[id, client_id]).set(stats.ts); + self.ts + .with_guarded_label_values(&[id, client_id]) + .set(stats.ts); self.time - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.time); - self.age.with_label_values(&[id, client_id]).set(stats.age); + self.age + .with_guarded_label_values(&[id, client_id]) + .set(stats.age); self.replyq - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.replyq); self.msg_cnt - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.msg_cnt); self.msg_size - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.msg_size); self.msg_max - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.msg_max); self.msg_size_max - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.msg_size_max); - self.tx.with_label_values(&[id, client_id]).set(stats.tx); + self.tx + .with_guarded_label_values(&[id, client_id]) + .set(stats.tx); self.tx_bytes - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.tx_bytes); - self.rx.with_label_values(&[id, client_id]).set(stats.rx); + self.rx + .with_guarded_label_values(&[id, client_id]) + .set(stats.rx); self.rx_bytes - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.rx_bytes); self.tx_msgs - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.txmsgs); self.tx_msgs_bytes - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.txmsg_bytes); self.rx_msgs - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.rxmsgs); self.rx_msgs_bytes - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.rxmsg_bytes); self.simple_cnt - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.simple_cnt); self.metadata_cache_cnt - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.metadata_cache_cnt); self.broker_stats.report(id, client_id, stats); @@ -903,161 +930,161 @@ impl RdKafkaStats { impl BrokerStats { pub fn new(registry: Registry) -> Self { - let state_age = register_int_gauge_vec_with_registry!( + let state_age = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_state_age", "Age of the broker state in seconds", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let outbuf_cnt = register_int_gauge_vec_with_registry!( + let outbuf_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_outbuf_cnt", "Number of messages waiting to be sent to broker", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let outbuf_msg_cnt = register_int_gauge_vec_with_registry!( + let outbuf_msg_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_outbuf_msg_cnt", "Number of messages waiting to be sent to broker", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let waitresp_cnt = register_int_gauge_vec_with_registry!( + let waitresp_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_waitresp_cnt", "Number of requests waiting for response", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let waitresp_msg_cnt = register_int_gauge_vec_with_registry!( + let waitresp_msg_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_waitresp_msg_cnt", "Number of messages waiting for response", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let tx = register_uint_gauge_vec_with_registry!( + let tx = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_broker_tx", "Number of transmitted messages", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let tx_bytes = register_uint_gauge_vec_with_registry!( + let tx_bytes = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_broker_tx_bytes", "Number of transmitted bytes", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let tx_errs = register_uint_gauge_vec_with_registry!( + let tx_errs = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_broker_tx_errs", "Number of failed transmitted messages", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let tx_retries = register_uint_gauge_vec_with_registry!( + let tx_retries = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_broker_tx_retries", "Number of message retries", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let tx_idle = register_int_gauge_vec_with_registry!( + let tx_idle = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_tx_idle", "Number of idle transmit connections", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let req_timeouts = register_uint_gauge_vec_with_registry!( + let req_timeouts = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_broker_req_timeouts", "Number of request timeouts", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let rx = register_uint_gauge_vec_with_registry!( + let rx = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_broker_rx", "Number of received messages", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let rx_bytes = register_uint_gauge_vec_with_registry!( + let rx_bytes = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_broker_rx_bytes", "Number of received bytes", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let rx_errs = register_uint_gauge_vec_with_registry!( + let rx_errs = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_broker_rx_errs", "Number of failed received messages", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let rx_corriderrs = register_uint_gauge_vec_with_registry!( + let rx_corriderrs = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_broker_rx_corriderrs", "Number of received messages with invalid correlation id", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let rx_partial = register_uint_gauge_vec_with_registry!( + let rx_partial = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_broker_rx_partial", "Number of partial messages received", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let rx_idle = register_int_gauge_vec_with_registry!( + let rx_idle = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_rx_idle", "Number of idle receive connections", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let req = register_int_gauge_vec_with_registry!( + let req = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_req", "Number of requests in flight", &["id", "client_id", "broker", "state", "type"], registry ) .unwrap(); - let zbuf_grow = register_uint_gauge_vec_with_registry!( + let zbuf_grow = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_broker_zbuf_grow", "Number of times the broker's output buffer has been reallocated", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let buf_grow = register_uint_gauge_vec_with_registry!( + let buf_grow = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_broker_buf_grow", "Number of times the broker's input buffer has been reallocated", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let wakeups = register_uint_gauge_vec_with_registry!( + let wakeups = register_guarded_uint_gauge_vec_with_registry!( "rdkafka_broker_wakeups", "Number of wakeups", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let connects = register_int_gauge_vec_with_registry!( + let connects = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_connects", "Number of connection attempts", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let disconnects = register_int_gauge_vec_with_registry!( + let disconnects = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_disconnects", "Number of disconnects", &["id", "client_id", "broker", "state"], @@ -1113,57 +1140,75 @@ impl BrokerStats { let labels = [id, client_id, broker, state]; self.state_age - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.stateage); self.outbuf_cnt - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.outbuf_cnt); self.outbuf_msg_cnt - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.outbuf_msg_cnt); self.waitresp_cnt - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.waitresp_cnt); self.waitresp_msg_cnt - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.waitresp_msg_cnt); - self.tx.with_label_values(&labels).set(stats.tx); - self.tx_bytes.with_label_values(&labels).set(stats.txbytes); - self.tx_errs.with_label_values(&labels).set(stats.txerrs); + self.tx.with_guarded_label_values(&labels).set(stats.tx); + self.tx_bytes + .with_guarded_label_values(&labels) + .set(stats.txbytes); + self.tx_errs + .with_guarded_label_values(&labels) + .set(stats.txerrs); self.tx_retries - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.txretries); - self.tx_idle.with_label_values(&labels).set(stats.txidle); + self.tx_idle + .with_guarded_label_values(&labels) + .set(stats.txidle); self.req_timeouts - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.req_timeouts); - self.rx.with_label_values(&labels).set(stats.rx); - self.rx_bytes.with_label_values(&labels).set(stats.rxbytes); - self.rx_errs.with_label_values(&labels).set(stats.rxerrs); + self.rx.with_guarded_label_values(&labels).set(stats.rx); + self.rx_bytes + .with_guarded_label_values(&labels) + .set(stats.rxbytes); + self.rx_errs + .with_guarded_label_values(&labels) + .set(stats.rxerrs); self.rx_corriderrs - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.rxcorriderrs); self.rx_partial - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.rxpartial); - self.rx_idle.with_label_values(&labels).set(stats.rxidle); + self.rx_idle + .with_guarded_label_values(&labels) + .set(stats.rxidle); for (req_type, req_cnt) in &stats.req { self.req - .with_label_values(&[id, client_id, broker, state, req_type]) + .with_guarded_label_values(&[id, client_id, broker, state, req_type]) .set(*req_cnt); } self.zbuf_grow - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.zbuf_grow); - self.buf_grow.with_label_values(&labels).set(stats.buf_grow); + self.buf_grow + .with_guarded_label_values(&labels) + .set(stats.buf_grow); if let Some(wakeups) = stats.wakeups { - self.wakeups.with_label_values(&labels).set(wakeups); + self.wakeups.with_guarded_label_values(&labels).set(wakeups); } if let Some(connects) = stats.connects { - self.connects.with_label_values(&labels).set(connects); + self.connects + .with_guarded_label_values(&labels) + .set(connects); } if let Some(disconnects) = stats.disconnects { - self.disconnects.with_label_values(&labels).set(disconnects); + self.disconnects + .with_guarded_label_values(&labels) + .set(disconnects); } if let Some(int_latency) = &stats.int_latency { self.int_latency