diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 08e8bd3dd46ea..e521bb9205851 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -701,7 +701,7 @@ async fn into_chunk_stream(mut parser: P, data_stream // report to promethus GLOBAL_SOURCE_METRICS .direct_cdc_event_lag_latency - .with_label_values(&[&msg_meta.full_table_name]) + .with_guarded_label_values(&[&msg_meta.full_table_name]) .observe(lag_ms as f64); } diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index cf2b5c3d17e00..d29996883a954 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -208,7 +208,7 @@ impl CommonSplitReader for CdcSplitReader { tracing::trace!("receive {} cdc events ", events.len()); metrics .connector_source_rows_received - .with_label_values(&[source_type.as_str_name(), &source_id]) + .with_guarded_label_values(&[source_type.as_str_name(), &source_id]) .inc_by(events.len() as u64); let msgs = events.into_iter().map(SourceMessage::from).collect_vec(); yield msgs; diff --git a/src/connector/src/source/common.rs b/src/connector/src/source/common.rs index 8fcd1318401f2..287f15c092f64 100644 --- a/src/connector/src/source/common.rs +++ b/src/connector/src/source/common.rs @@ -52,7 +52,7 @@ pub(crate) async fn into_chunk_stream( for (split_id, msgs) in by_split_id { metrics .partition_input_count - .with_label_values(&[ + .with_guarded_label_values(&[ &actor_id, &source_id, split_id, @@ -68,7 +68,7 @@ pub(crate) async fn into_chunk_stream( metrics .partition_input_bytes - .with_label_values(&[ + .with_guarded_label_values(&[ &actor_id, &source_id, split_id, diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index 87f798d59f38b..3499b0ed6cecb 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -161,7 +161,7 @@ impl SplitReader for DatagenSplitReader { .inspect_ok(move |stream_chunk| { metrics .partition_input_count - .with_label_values(&[ + .with_guarded_label_values(&[ &actor_id, &source_id, &split_id, diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 0fb53ce8a0d1c..612e66fbb692a 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -97,7 +97,7 @@ impl OpendalReader { self.source_ctx .metrics .partition_input_count - .with_label_values(&[ + .with_guarded_label_values(&[ &actor_id, &source_id, &split_id, @@ -159,7 +159,7 @@ impl OpendalReader { source_ctx .metrics .partition_input_bytes - .with_label_values(&[ + .with_guarded_label_values(&[ &actor_id, &source_id, &split_id, @@ -176,7 +176,13 @@ impl OpendalReader { source_ctx .metrics .partition_input_bytes - .with_label_values(&[&actor_id, &source_id, &split_id, &source_name, &fragment_id]) + .with_guarded_label_values(&[ + &actor_id, + &source_id, + &split_id, + &source_name, + &fragment_id, + ]) .inc_by(batch_size as u64); yield batch; } diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 129b708a61521..cbc06f9ceb117 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -122,7 +122,7 @@ impl S3FileReader { source_ctx .metrics .partition_input_bytes - .with_label_values(&[ + .with_guarded_label_values(&[ &actor_id, &source_id, &split_id, @@ -139,7 +139,13 @@ impl S3FileReader { source_ctx .metrics .partition_input_bytes - .with_label_values(&[&actor_id, &source_id, &split_id, &source_name, &fragment_id]) + .with_guarded_label_values(&[ + &actor_id, + &source_id, + &split_id, + &source_name, + &fragment_id, + ]) .inc_by(batch_size as u64); yield batch; } @@ -240,7 +246,7 @@ impl S3FileReader { self.source_ctx .metrics .partition_input_count - .with_label_values(&[ + .with_guarded_label_values(&[ &actor_id, &source_id, &split_id, diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index d4dddad67bfdf..82dfaf11e5d83 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -361,7 +361,7 @@ impl KafkaSplitEnumerator { self.context .metrics .high_watermark - .with_label_values(&[ + .with_guarded_label_values(&[ &self.context.info.source_id.to_string(), &partition.to_string(), ]) diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index e3c59611d1acc..f2a7dc01d3e8d 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -158,7 +158,7 @@ impl KafkaSplitReader { self.source_ctx .metrics .latest_message_id - .with_label_values(&[ + .with_guarded_label_values(&[ // source name is not available here &self.source_ctx.source_id.to_string(), &self.source_ctx.actor_id.to_string(), diff --git a/src/connector/src/source/kafka/stats.rs b/src/connector/src/source/kafka/stats.rs index 2b94b4c90a853..c255736a239c1 100644 --- a/src/connector/src/source/kafka/stats.rs +++ b/src/connector/src/source/kafka/stats.rs @@ -12,54 +12,35 @@ // See the License for the specific language governing permissions and // limitations under the License. -use prometheus::core::{AtomicU64, GenericGaugeVec}; -use prometheus::{opts, register_int_gauge_vec_with_registry, IntGaugeVec, Registry}; +use prometheus::Registry; use rdkafka::statistics::{Broker, ConsumerGroup, Partition, Topic, Window}; use rdkafka::Statistics; - -type UintGaugeVec = GenericGaugeVec; - -macro_rules! register_gauge_vec { - ($TYPE:ident, $OPTS:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{ - let gauge_vec = $TYPE::new($OPTS, $LABELS_NAMES).unwrap(); - $REGISTRY - .register(Box::new(gauge_vec.clone())) - .map(|_| gauge_vec) - }}; -} - -macro_rules! register_uint_gauge_vec_with_registry { - ($OPTS:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{ - register_gauge_vec!(UintGaugeVec, $OPTS, $LABELS_NAMES, $REGISTRY) - }}; - - ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{ - register_uint_gauge_vec_with_registry!(opts!($NAME, $HELP), $LABELS_NAMES, $REGISTRY) - }}; -} +use risingwave_common::metrics::{ + register_guarded_int_gauge_vec_with_registry, LabelGuardedIntGaugeVec, +}; #[derive(Debug, Clone)] pub struct RdKafkaStats { pub registry: Registry, - pub ts: IntGaugeVec, - pub time: IntGaugeVec, - pub age: IntGaugeVec, - pub replyq: IntGaugeVec, - pub msg_cnt: GenericGaugeVec, - pub msg_size: GenericGaugeVec, - pub msg_max: GenericGaugeVec, - pub msg_size_max: GenericGaugeVec, - pub tx: IntGaugeVec, - pub tx_bytes: IntGaugeVec, - pub rx: IntGaugeVec, - pub rx_bytes: IntGaugeVec, - pub tx_msgs: IntGaugeVec, - pub tx_msgs_bytes: IntGaugeVec, - pub rx_msgs: IntGaugeVec, - pub rx_msgs_bytes: IntGaugeVec, - pub simple_cnt: IntGaugeVec, - pub metadata_cache_cnt: IntGaugeVec, + pub ts: LabelGuardedIntGaugeVec<2>, + pub time: LabelGuardedIntGaugeVec<2>, + pub age: LabelGuardedIntGaugeVec<2>, + pub replyq: LabelGuardedIntGaugeVec<2>, + pub msg_cnt: LabelGuardedIntGaugeVec<2>, + pub msg_size: LabelGuardedIntGaugeVec<2>, + pub msg_max: LabelGuardedIntGaugeVec<2>, + pub msg_size_max: LabelGuardedIntGaugeVec<2>, + pub tx: LabelGuardedIntGaugeVec<2>, + pub tx_bytes: LabelGuardedIntGaugeVec<2>, + pub rx: LabelGuardedIntGaugeVec<2>, + pub rx_bytes: LabelGuardedIntGaugeVec<2>, + pub tx_msgs: LabelGuardedIntGaugeVec<2>, + pub tx_msgs_bytes: LabelGuardedIntGaugeVec<2>, + pub rx_msgs: LabelGuardedIntGaugeVec<2>, + pub rx_msgs_bytes: LabelGuardedIntGaugeVec<2>, + pub simple_cnt: LabelGuardedIntGaugeVec<2>, + pub metadata_cache_cnt: LabelGuardedIntGaugeVec<2>, pub broker_stats: BrokerStats, pub topic_stats: TopicStats, @@ -70,29 +51,29 @@ pub struct RdKafkaStats { pub struct BrokerStats { pub registry: Registry, - pub state_age: IntGaugeVec, - pub outbuf_cnt: IntGaugeVec, - pub outbuf_msg_cnt: IntGaugeVec, - pub waitresp_cnt: IntGaugeVec, - pub waitresp_msg_cnt: IntGaugeVec, - pub tx: GenericGaugeVec, - pub tx_bytes: GenericGaugeVec, - pub tx_errs: GenericGaugeVec, - pub tx_retries: GenericGaugeVec, - pub tx_idle: IntGaugeVec, - pub req_timeouts: GenericGaugeVec, - pub rx: GenericGaugeVec, - pub rx_bytes: GenericGaugeVec, - pub rx_errs: GenericGaugeVec, - pub rx_corriderrs: GenericGaugeVec, - pub rx_partial: GenericGaugeVec, - pub rx_idle: IntGaugeVec, - pub req: IntGaugeVec, - pub zbuf_grow: GenericGaugeVec, - pub buf_grow: GenericGaugeVec, - pub wakeups: GenericGaugeVec, - pub connects: IntGaugeVec, - pub disconnects: IntGaugeVec, + pub state_age: LabelGuardedIntGaugeVec<4>, + pub outbuf_cnt: LabelGuardedIntGaugeVec<4>, + pub outbuf_msg_cnt: LabelGuardedIntGaugeVec<4>, + pub waitresp_cnt: LabelGuardedIntGaugeVec<4>, + pub waitresp_msg_cnt: LabelGuardedIntGaugeVec<4>, + pub tx: LabelGuardedIntGaugeVec<4>, + pub tx_bytes: LabelGuardedIntGaugeVec<4>, + pub tx_errs: LabelGuardedIntGaugeVec<4>, + pub tx_retries: LabelGuardedIntGaugeVec<4>, + pub tx_idle: LabelGuardedIntGaugeVec<4>, + pub req_timeouts: LabelGuardedIntGaugeVec<4>, + pub rx: LabelGuardedIntGaugeVec<4>, + pub rx_bytes: LabelGuardedIntGaugeVec<4>, + pub rx_errs: LabelGuardedIntGaugeVec<4>, + pub rx_corriderrs: LabelGuardedIntGaugeVec<4>, + pub rx_partial: LabelGuardedIntGaugeVec<4>, + pub rx_idle: LabelGuardedIntGaugeVec<4>, + pub req: LabelGuardedIntGaugeVec<5>, + pub zbuf_grow: LabelGuardedIntGaugeVec<4>, + pub buf_grow: LabelGuardedIntGaugeVec<4>, + pub wakeups: LabelGuardedIntGaugeVec<4>, + pub connects: LabelGuardedIntGaugeVec<4>, + pub disconnects: LabelGuardedIntGaugeVec<4>, pub int_latency: StatsWindow, pub outbuf_latency: StatsWindow, pub rtt: StatsWindow, @@ -103,7 +84,7 @@ pub struct BrokerStats { pub struct TopicStats { pub registry: Registry, - pub metadata_age: IntGaugeVec, + pub metadata_age: LabelGuardedIntGaugeVec<3>, pub batch_size: StatsWindow, pub batch_cnt: StatsWindow, pub partitions: PartitionStats, @@ -113,58 +94,58 @@ pub struct TopicStats { pub struct StatsWindow { pub registry: Registry, - pub min: IntGaugeVec, - pub max: IntGaugeVec, - pub avg: IntGaugeVec, - pub sum: IntGaugeVec, - pub cnt: IntGaugeVec, - pub stddev: IntGaugeVec, - pub hdr_size: IntGaugeVec, - pub p50: IntGaugeVec, - pub p75: IntGaugeVec, - pub p90: IntGaugeVec, - pub p95: IntGaugeVec, - pub p99: IntGaugeVec, - pub p99_99: IntGaugeVec, - pub out_of_range: IntGaugeVec, + pub min: LabelGuardedIntGaugeVec<4>, + pub max: LabelGuardedIntGaugeVec<4>, + pub avg: LabelGuardedIntGaugeVec<4>, + pub sum: LabelGuardedIntGaugeVec<4>, + pub cnt: LabelGuardedIntGaugeVec<4>, + pub stddev: LabelGuardedIntGaugeVec<4>, + pub hdr_size: LabelGuardedIntGaugeVec<4>, + pub p50: LabelGuardedIntGaugeVec<4>, + pub p75: LabelGuardedIntGaugeVec<4>, + pub p90: LabelGuardedIntGaugeVec<4>, + pub p95: LabelGuardedIntGaugeVec<4>, + pub p99: LabelGuardedIntGaugeVec<4>, + pub p99_99: LabelGuardedIntGaugeVec<4>, + pub out_of_range: LabelGuardedIntGaugeVec<4>, } #[derive(Debug, Clone)] pub struct ConsumerGroupStats { pub registry: Registry, - pub state_age: IntGaugeVec, - // todo: (do not know value set) join_state: IntGaugeVec, - pub rebalance_age: IntGaugeVec, - pub rebalance_cnt: IntGaugeVec, + pub state_age: LabelGuardedIntGaugeVec<3>, + // todo: (do not know value set) join_state: LabelGuardedIntGaugeVec<2>, + pub rebalance_age: LabelGuardedIntGaugeVec<3>, + pub rebalance_cnt: LabelGuardedIntGaugeVec<3>, // todo: (cannot handle string) rebalance_reason, - pub assignment_size: IntGaugeVec, + pub assignment_size: LabelGuardedIntGaugeVec<3>, } impl ConsumerGroupStats { pub fn new(registry: Registry) -> Self { - let state_age = register_int_gauge_vec_with_registry!( + let state_age = register_guarded_int_gauge_vec_with_registry!( "rdkafka_consumer_group_state_age", "Age of the consumer group state in seconds", &["id", "client_id", "state"], registry ) .unwrap(); - let rebalance_age = register_int_gauge_vec_with_registry!( + let rebalance_age = register_guarded_int_gauge_vec_with_registry!( "rdkafka_consumer_group_rebalance_age", "Age of the last rebalance in seconds", &["id", "client_id", "state"], registry ) .unwrap(); - let rebalance_cnt = register_int_gauge_vec_with_registry!( + let rebalance_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_consumer_group_rebalance_cnt", "Number of rebalances", &["id", "client_id", "state"], registry ) .unwrap(); - let assignment_size = register_int_gauge_vec_with_registry!( + let assignment_size = register_guarded_int_gauge_vec_with_registry!( "rdkafka_consumer_group_assignment_size", "Number of assigned partitions", &["id", "client_id", "state"], @@ -184,16 +165,16 @@ impl ConsumerGroupStats { pub fn report(&self, id: &str, client_id: &str, stats: &ConsumerGroup) { let state = stats.state.as_str(); self.state_age - .with_label_values(&[id, client_id, state]) + .with_guarded_label_values(&[id, client_id, state]) .set(stats.stateage); self.rebalance_age - .with_label_values(&[id, client_id, state]) + .with_guarded_label_values(&[id, client_id, state]) .set(stats.rebalance_age); self.rebalance_cnt - .with_label_values(&[id, client_id, state]) + .with_guarded_label_values(&[id, client_id, state]) .set(stats.rebalance_cnt); self.assignment_size - .with_label_values(&[id, client_id, state]) + .with_guarded_label_values(&[id, client_id, state]) .set(stats.assignment_size as i64); } } @@ -201,98 +182,98 @@ impl ConsumerGroupStats { impl StatsWindow { pub fn new(registry: Registry, path: &str) -> Self { let get_metric_name = |name: &str| format!("rdkafka_{}_{}", path, name); - let min = register_int_gauge_vec_with_registry!( + let min = register_guarded_int_gauge_vec_with_registry!( get_metric_name("min"), "Minimum value", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let max = register_int_gauge_vec_with_registry!( + let max = register_guarded_int_gauge_vec_with_registry!( get_metric_name("max"), "Maximum value", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let avg = register_int_gauge_vec_with_registry!( + let avg = register_guarded_int_gauge_vec_with_registry!( get_metric_name("avg"), "Average value", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let sum = register_int_gauge_vec_with_registry!( + let sum = register_guarded_int_gauge_vec_with_registry!( get_metric_name("sum"), "Sum of values", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let cnt = register_int_gauge_vec_with_registry!( + let cnt = register_guarded_int_gauge_vec_with_registry!( get_metric_name("cnt"), "Count of values", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let stddev = register_int_gauge_vec_with_registry!( + let stddev = register_guarded_int_gauge_vec_with_registry!( get_metric_name("stddev"), "Standard deviation", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let hdr_size = register_int_gauge_vec_with_registry!( + let hdr_size = register_guarded_int_gauge_vec_with_registry!( get_metric_name("hdrsize"), "Size of the histogram header", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let p50 = register_int_gauge_vec_with_registry!( + let p50 = register_guarded_int_gauge_vec_with_registry!( get_metric_name("p50"), "50th percentile", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let p75 = register_int_gauge_vec_with_registry!( + let p75 = register_guarded_int_gauge_vec_with_registry!( get_metric_name("p75"), "75th percentile", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let p90 = register_int_gauge_vec_with_registry!( + let p90 = register_guarded_int_gauge_vec_with_registry!( get_metric_name("p90"), "90th percentile", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let p95 = register_int_gauge_vec_with_registry!( + let p95 = register_guarded_int_gauge_vec_with_registry!( get_metric_name("p95"), "95th percentile", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let p99 = register_int_gauge_vec_with_registry!( + let p99 = register_guarded_int_gauge_vec_with_registry!( get_metric_name("p99"), "99th percentile", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let p99_99 = register_int_gauge_vec_with_registry!( + let p99_99 = register_guarded_int_gauge_vec_with_registry!( get_metric_name("p99_99"), "99.99th percentile", &["id", "client_id", "broker", "topic"], registry ) .unwrap(); - let out_of_range = register_int_gauge_vec_with_registry!( + let out_of_range = register_guarded_int_gauge_vec_with_registry!( get_metric_name("out_of_range"), "Out of range values", &["id", "client_id", "broker", "topic"], @@ -322,26 +303,32 @@ impl StatsWindow { pub fn report(&self, id: &str, client_id: &str, broker: &str, topic: &str, stats: &Window) { let labels = [id, client_id, broker, topic]; - self.min.with_label_values(&labels).set(stats.min); - self.max.with_label_values(&labels).set(stats.max); - self.avg.with_label_values(&labels).set(stats.avg); - self.sum.with_label_values(&labels).set(stats.sum); - self.cnt.with_label_values(&labels).set(stats.cnt); - self.stddev.with_label_values(&labels).set(stats.stddev); - self.hdr_size.with_label_values(&labels).set(stats.hdrsize); - self.p50.with_label_values(&labels).set(stats.p50); - self.p75.with_label_values(&labels).set(stats.p75); - self.p90.with_label_values(&labels).set(stats.p90); - self.p99_99.with_label_values(&labels).set(stats.p99_99); + self.min.with_guarded_label_values(&labels).set(stats.min); + self.max.with_guarded_label_values(&labels).set(stats.max); + self.avg.with_guarded_label_values(&labels).set(stats.avg); + self.sum.with_guarded_label_values(&labels).set(stats.sum); + self.cnt.with_guarded_label_values(&labels).set(stats.cnt); + self.stddev + .with_guarded_label_values(&labels) + .set(stats.stddev); + self.hdr_size + .with_guarded_label_values(&labels) + .set(stats.hdrsize); + self.p50.with_guarded_label_values(&labels).set(stats.p50); + self.p75.with_guarded_label_values(&labels).set(stats.p75); + self.p90.with_guarded_label_values(&labels).set(stats.p90); + self.p99_99 + .with_guarded_label_values(&labels) + .set(stats.p99_99); self.out_of_range - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.outofrange); } } impl TopicStats { pub fn new(registry: Registry) -> Self { - let metadata_age = register_int_gauge_vec_with_registry!( + let metadata_age = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_metadata_age", "Age of the topic metadata in milliseconds", &["id", "client_id", "topic"], @@ -368,7 +355,7 @@ impl TopicStats { fn report_inner(&self, id: &str, client_id: &str, topic: &str, stats: &Topic) { self.metadata_age - .with_label_values(&[id, client_id, topic]) + .with_guarded_label_values(&[id, client_id, topic]) .set(stats.metadata_age); self.batch_size .report(id, client_id, "", topic, &stats.batchsize); @@ -382,212 +369,212 @@ impl TopicStats { pub struct PartitionStats { pub registry: Registry, - pub msgq_cnt: IntGaugeVec, - pub msgq_bytes: GenericGaugeVec, - pub xmit_msgq_cnt: IntGaugeVec, - pub xmit_msgq_bytes: GenericGaugeVec, - pub fetchq_cnt: IntGaugeVec, - pub fetchq_size: GenericGaugeVec, - pub query_offset: IntGaugeVec, - pub next_offset: IntGaugeVec, - pub app_offset: IntGaugeVec, - pub stored_offset: IntGaugeVec, - pub committed_offset: IntGaugeVec, - pub eof_offset: IntGaugeVec, - pub lo_offset: IntGaugeVec, - pub hi_offset: IntGaugeVec, - pub consumer_lag: IntGaugeVec, - pub consumer_lag_store: IntGaugeVec, - pub txmsgs: GenericGaugeVec, - pub txbytes: GenericGaugeVec, - pub rxmsgs: GenericGaugeVec, - pub rxbytes: GenericGaugeVec, - pub msgs: GenericGaugeVec, - pub rx_ver_drops: GenericGaugeVec, - pub msgs_inflight: IntGaugeVec, - pub next_ack_seq: IntGaugeVec, - pub next_err_seq: IntGaugeVec, - pub acked_msgid: GenericGaugeVec, + pub msgq_cnt: LabelGuardedIntGaugeVec<4>, + pub msgq_bytes: LabelGuardedIntGaugeVec<4>, + pub xmit_msgq_cnt: LabelGuardedIntGaugeVec<4>, + pub xmit_msgq_bytes: LabelGuardedIntGaugeVec<4>, + pub fetchq_cnt: LabelGuardedIntGaugeVec<4>, + pub fetchq_size: LabelGuardedIntGaugeVec<4>, + pub query_offset: LabelGuardedIntGaugeVec<4>, + pub next_offset: LabelGuardedIntGaugeVec<4>, + pub app_offset: LabelGuardedIntGaugeVec<4>, + pub stored_offset: LabelGuardedIntGaugeVec<4>, + pub committed_offset: LabelGuardedIntGaugeVec<4>, + pub eof_offset: LabelGuardedIntGaugeVec<4>, + pub lo_offset: LabelGuardedIntGaugeVec<4>, + pub hi_offset: LabelGuardedIntGaugeVec<4>, + pub consumer_lag: LabelGuardedIntGaugeVec<4>, + pub consumer_lag_store: LabelGuardedIntGaugeVec<4>, + pub txmsgs: LabelGuardedIntGaugeVec<4>, + pub txbytes: LabelGuardedIntGaugeVec<4>, + pub rxmsgs: LabelGuardedIntGaugeVec<4>, + pub rxbytes: LabelGuardedIntGaugeVec<4>, + pub msgs: LabelGuardedIntGaugeVec<4>, + pub rx_ver_drops: LabelGuardedIntGaugeVec<4>, + pub msgs_inflight: LabelGuardedIntGaugeVec<4>, + pub next_ack_seq: LabelGuardedIntGaugeVec<4>, + pub next_err_seq: LabelGuardedIntGaugeVec<4>, + pub acked_msgid: LabelGuardedIntGaugeVec<4>, } impl PartitionStats { pub fn new(registry: Registry) -> Self { - let msgq_cnt = register_int_gauge_vec_with_registry!( + let msgq_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_msgq_cnt", "Number of messages in the producer queue", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let msgq_bytes = register_uint_gauge_vec_with_registry!( + let msgq_bytes = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_msgq_bytes", "Size of messages in the producer queue", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let xmit_msgq_cnt = register_int_gauge_vec_with_registry!( + let xmit_msgq_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_xmit_msgq_cnt", "Number of messages in the transmit queue", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let xmit_msgq_bytes = register_uint_gauge_vec_with_registry!( + let xmit_msgq_bytes = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_xmit_msgq_bytes", "Size of messages in the transmit queue", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let fetchq_cnt = register_int_gauge_vec_with_registry!( + let fetchq_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_fetchq_cnt", "Number of messages in the fetch queue", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let fetchq_size = register_uint_gauge_vec_with_registry!( + let fetchq_size = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_fetchq_size", "Size of messages in the fetch queue", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let query_offset = register_int_gauge_vec_with_registry!( + let query_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_query_offset", "Current query offset", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let next_offset = register_int_gauge_vec_with_registry!( + let next_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_next_offset", "Next offset to query", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let app_offset = register_int_gauge_vec_with_registry!( + let app_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_app_offset", "Last acknowledged offset", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let stored_offset = register_int_gauge_vec_with_registry!( + let stored_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_stored_offset", "Last stored offset", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let committed_offset = register_int_gauge_vec_with_registry!( + let committed_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_committed_offset", "Last committed offset", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let eof_offset = register_int_gauge_vec_with_registry!( + let eof_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_eof_offset", "Last offset in broker log", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let lo_offset = register_int_gauge_vec_with_registry!( + let lo_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_lo_offset", "Low offset", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let hi_offset = register_int_gauge_vec_with_registry!( + let hi_offset = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_hi_offset", "High offset", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let consumer_lag = register_int_gauge_vec_with_registry!( + let consumer_lag = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_consumer_lag", "Consumer lag", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let consumer_lag_store = register_int_gauge_vec_with_registry!( + let consumer_lag_store = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_consumer_lag_store", "Consumer lag stored", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let txmsgs = register_uint_gauge_vec_with_registry!( + let txmsgs = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_txmsgs", "Number of transmitted messages", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let txbytes = register_uint_gauge_vec_with_registry!( + let txbytes = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_txbytes", "Number of transmitted bytes", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let rxmsgs = register_uint_gauge_vec_with_registry!( + let rxmsgs = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_rxmsgs", "Number of received messages", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let rxbytes = register_uint_gauge_vec_with_registry!( + let rxbytes = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_rxbytes", "Number of received bytes", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let msgs = register_uint_gauge_vec_with_registry!( + let msgs = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_msgs", "Number of messages in partition", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let rx_ver_drops = register_uint_gauge_vec_with_registry!( + let rx_ver_drops = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_rx_ver_drops", "Number of received messages dropped due to version mismatch", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let msgs_inflight = register_int_gauge_vec_with_registry!( + let msgs_inflight = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_msgs_inflight", "Number of messages in-flight", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let next_ack_seq = register_int_gauge_vec_with_registry!( + let next_ack_seq = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_next_ack_seq", "Next ack sequence number", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let next_err_seq = register_int_gauge_vec_with_registry!( + let next_err_seq = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_next_err_seq", "Next error sequence number", &["id", "client_id", "topic", "partition"], registry ) .unwrap(); - let acked_msgid = register_uint_gauge_vec_with_registry!( + let acked_msgid = register_guarded_int_gauge_vec_with_registry!( "rdkafka_topic_partition_acked_msgid", "Acknowledged message ID", &["id", "client_id", "topic", "partition"], @@ -635,78 +622,90 @@ impl PartitionStats { fn report_inner(&self, id: &str, client_id: &str, topic: &str, stats: &Partition) { let labels = [id, client_id, topic, &stats.partition.to_string()]; - self.msgq_cnt.with_label_values(&labels).set(stats.msgq_cnt); + self.msgq_cnt + .with_guarded_label_values(&labels) + .set(stats.msgq_cnt); self.msgq_bytes - .with_label_values(&labels) - .set(stats.msgq_bytes); + .with_guarded_label_values(&labels) + .set(stats.msgq_bytes as i64); self.xmit_msgq_cnt - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.xmit_msgq_cnt); self.xmit_msgq_bytes - .with_label_values(&labels) - .set(stats.xmit_msgq_bytes); + .with_guarded_label_values(&labels) + .set(stats.xmit_msgq_bytes as i64); self.fetchq_cnt - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.fetchq_cnt); self.fetchq_size - .with_label_values(&labels) - .set(stats.fetchq_size); + .with_guarded_label_values(&labels) + .set(stats.fetchq_size as i64); self.query_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.query_offset); self.next_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.next_offset); self.app_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.app_offset); self.stored_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.stored_offset); self.committed_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.committed_offset); self.eof_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.eof_offset); self.lo_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.lo_offset); self.hi_offset - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.hi_offset); self.consumer_lag - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.consumer_lag); self.consumer_lag_store - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.consumer_lag_stored); - self.txmsgs.with_label_values(&labels).set(stats.txmsgs); - self.txbytes.with_label_values(&labels).set(stats.txbytes); - self.rxmsgs.with_label_values(&labels).set(stats.rxmsgs); - self.rxbytes.with_label_values(&labels).set(stats.rxbytes); - self.msgs.with_label_values(&labels).set(stats.msgs); + self.txmsgs + .with_guarded_label_values(&labels) + .set(stats.txmsgs as i64); + self.txbytes + .with_guarded_label_values(&labels) + .set(stats.txbytes as i64); + self.rxmsgs + .with_guarded_label_values(&labels) + .set(stats.rxmsgs as i64); + self.rxbytes + .with_guarded_label_values(&labels) + .set(stats.rxbytes as i64); + self.msgs + .with_guarded_label_values(&labels) + .set(stats.msgs as i64); self.rx_ver_drops - .with_label_values(&labels) - .set(stats.rx_ver_drops); + .with_guarded_label_values(&labels) + .set(stats.rx_ver_drops as i64); self.msgs_inflight - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.msgs_inflight); self.next_ack_seq - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.next_ack_seq); self.next_err_seq - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.next_err_seq); self.acked_msgid - .with_label_values(&labels) - .set(stats.acked_msgid); + .with_guarded_label_values(&labels) + .set(stats.acked_msgid as i64); } } impl RdKafkaStats { pub fn new(registry: Registry) -> Self { - let ts = register_int_gauge_vec_with_registry!( + let ts = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_ts", "librdkafka's internal monotonic clock (microseconds)", // we cannot tell whether it is for consumer or producer, @@ -715,119 +714,119 @@ impl RdKafkaStats { registry ) .unwrap(); - let time = register_int_gauge_vec_with_registry!( + let time = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_time", "Wall clock time in seconds since the epoch", &["id", "client_id"], registry ) .unwrap(); - let age = register_int_gauge_vec_with_registry!( + let age = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_age", "Age of the topic metadata in milliseconds", &["id", "client_id"], registry ) .unwrap(); - let replyq = register_int_gauge_vec_with_registry!( + let replyq = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_replyq", "Number of replies waiting to be served", &["id", "client_id"], registry ) .unwrap(); - let msg_cnt = register_uint_gauge_vec_with_registry!( + let msg_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_msg_cnt", "Number of messages in all topics", &["id", "client_id"], registry ) .unwrap(); - let msg_size = register_uint_gauge_vec_with_registry!( + let msg_size = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_msg_size", "Size of messages in all topics", &["id", "client_id"], registry ) .unwrap(); - let msg_max = register_uint_gauge_vec_with_registry!( + let msg_max = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_msg_max", "Maximum message size in all topics", &["id", "client_id"], registry ) .unwrap(); - let msg_size_max = register_uint_gauge_vec_with_registry!( + let msg_size_max = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_msg_size_max", "Maximum message size in all topics", &["id", "client_id"], registry ) .unwrap(); - let tx = register_int_gauge_vec_with_registry!( + let tx = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_tx", "Number of transmitted messages", &["id", "client_id"], registry ) .unwrap(); - let tx_bytes = register_int_gauge_vec_with_registry!( + let tx_bytes = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_tx_bytes", "Number of transmitted bytes", &["id", "client_id"], registry ) .unwrap(); - let rx = register_int_gauge_vec_with_registry!( + let rx = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_rx", "Number of received messages", &["id", "client_id"], registry ) .unwrap(); - let rx_bytes = register_int_gauge_vec_with_registry!( + let rx_bytes = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_rx_bytes", "Number of received bytes", &["id", "client_id"], registry ) .unwrap(); - let tx_msgs = register_int_gauge_vec_with_registry!( + let tx_msgs = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_tx_msgs", "Number of transmitted messages", &["id", "client_id"], registry ) .unwrap(); - let tx_msgs_bytes = register_int_gauge_vec_with_registry!( + let tx_msgs_bytes = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_tx_msgs_bytes", "Number of transmitted bytes", &["id", "client_id"], registry ) .unwrap(); - let rx_msgs = register_int_gauge_vec_with_registry!( + let rx_msgs = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_rx_msgs", "Number of received messages", &["id", "client_id"], registry ) .unwrap(); - let rx_msgs_bytes = register_int_gauge_vec_with_registry!( + let rx_msgs_bytes = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_rx_msgs_bytes", "Number of received bytes", &["id", "client_id"], registry ) .unwrap(); - let simple_cnt = register_int_gauge_vec_with_registry!( + let simple_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_simple_cnt", "Number of simple consumer queues", &["id", "client_id"], registry ) .unwrap(); - let metadata_cache_cnt = register_int_gauge_vec_with_registry!( + let metadata_cache_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_top_metadata_cache_cnt", "Number of entries in the metadata cache", &["id", "client_id"], @@ -866,51 +865,59 @@ impl RdKafkaStats { pub fn report(&self, id: &str, stats: &Statistics) { let client_id = stats.name.as_str(); - self.ts.with_label_values(&[id, client_id]).set(stats.ts); + self.ts + .with_guarded_label_values(&[id, client_id]) + .set(stats.ts); self.time - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.time); - self.age.with_label_values(&[id, client_id]).set(stats.age); + self.age + .with_guarded_label_values(&[id, client_id]) + .set(stats.age); self.replyq - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.replyq); self.msg_cnt - .with_label_values(&[id, client_id]) - .set(stats.msg_cnt); + .with_guarded_label_values(&[id, client_id]) + .set(stats.msg_cnt as i64); self.msg_size - .with_label_values(&[id, client_id]) - .set(stats.msg_size); + .with_guarded_label_values(&[id, client_id]) + .set(stats.msg_size as i64); self.msg_max - .with_label_values(&[id, client_id]) - .set(stats.msg_max); + .with_guarded_label_values(&[id, client_id]) + .set(stats.msg_max as i64); self.msg_size_max - .with_label_values(&[id, client_id]) - .set(stats.msg_size_max); - self.tx.with_label_values(&[id, client_id]).set(stats.tx); + .with_guarded_label_values(&[id, client_id]) + .set(stats.msg_size_max as i64); + self.tx + .with_guarded_label_values(&[id, client_id]) + .set(stats.tx); self.tx_bytes - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.tx_bytes); - self.rx.with_label_values(&[id, client_id]).set(stats.rx); + self.rx + .with_guarded_label_values(&[id, client_id]) + .set(stats.rx); self.rx_bytes - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.rx_bytes); self.tx_msgs - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.txmsgs); self.tx_msgs_bytes - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.txmsg_bytes); self.rx_msgs - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.rxmsgs); self.rx_msgs_bytes - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.rxmsg_bytes); self.simple_cnt - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.simple_cnt); self.metadata_cache_cnt - .with_label_values(&[id, client_id]) + .with_guarded_label_values(&[id, client_id]) .set(stats.metadata_cache_cnt); self.broker_stats.report(id, client_id, stats); @@ -923,161 +930,161 @@ impl RdKafkaStats { impl BrokerStats { pub fn new(registry: Registry) -> Self { - let state_age = register_int_gauge_vec_with_registry!( + let state_age = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_state_age", "Age of the broker state in seconds", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let outbuf_cnt = register_int_gauge_vec_with_registry!( + let outbuf_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_outbuf_cnt", "Number of messages waiting to be sent to broker", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let outbuf_msg_cnt = register_int_gauge_vec_with_registry!( + let outbuf_msg_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_outbuf_msg_cnt", "Number of messages waiting to be sent to broker", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let waitresp_cnt = register_int_gauge_vec_with_registry!( + let waitresp_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_waitresp_cnt", "Number of requests waiting for response", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let waitresp_msg_cnt = register_int_gauge_vec_with_registry!( + let waitresp_msg_cnt = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_waitresp_msg_cnt", "Number of messages waiting for response", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let tx = register_uint_gauge_vec_with_registry!( + let tx = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_tx", "Number of transmitted messages", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let tx_bytes = register_uint_gauge_vec_with_registry!( + let tx_bytes = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_tx_bytes", "Number of transmitted bytes", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let tx_errs = register_uint_gauge_vec_with_registry!( + let tx_errs = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_tx_errs", "Number of failed transmitted messages", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let tx_retries = register_uint_gauge_vec_with_registry!( + let tx_retries = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_tx_retries", "Number of message retries", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let tx_idle = register_int_gauge_vec_with_registry!( + let tx_idle = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_tx_idle", "Number of idle transmit connections", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let req_timeouts = register_uint_gauge_vec_with_registry!( + let req_timeouts = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_req_timeouts", "Number of request timeouts", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let rx = register_uint_gauge_vec_with_registry!( + let rx = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_rx", "Number of received messages", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let rx_bytes = register_uint_gauge_vec_with_registry!( + let rx_bytes = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_rx_bytes", "Number of received bytes", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let rx_errs = register_uint_gauge_vec_with_registry!( + let rx_errs = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_rx_errs", "Number of failed received messages", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let rx_corriderrs = register_uint_gauge_vec_with_registry!( + let rx_corriderrs = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_rx_corriderrs", "Number of received messages with invalid correlation id", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let rx_partial = register_uint_gauge_vec_with_registry!( + let rx_partial = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_rx_partial", "Number of partial messages received", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let rx_idle = register_int_gauge_vec_with_registry!( + let rx_idle = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_rx_idle", "Number of idle receive connections", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let req = register_int_gauge_vec_with_registry!( + let req = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_req", "Number of requests in flight", &["id", "client_id", "broker", "state", "type"], registry ) .unwrap(); - let zbuf_grow = register_uint_gauge_vec_with_registry!( + let zbuf_grow = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_zbuf_grow", "Number of times the broker's output buffer has been reallocated", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let buf_grow = register_uint_gauge_vec_with_registry!( + let buf_grow = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_buf_grow", "Number of times the broker's input buffer has been reallocated", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let wakeups = register_uint_gauge_vec_with_registry!( + let wakeups = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_wakeups", "Number of wakeups", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let connects = register_int_gauge_vec_with_registry!( + let connects = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_connects", "Number of connection attempts", &["id", "client_id", "broker", "state"], registry ) .unwrap(); - let disconnects = register_int_gauge_vec_with_registry!( + let disconnects = register_guarded_int_gauge_vec_with_registry!( "rdkafka_broker_disconnects", "Number of disconnects", &["id", "client_id", "broker", "state"], @@ -1133,57 +1140,81 @@ impl BrokerStats { let labels = [id, client_id, broker, state]; self.state_age - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.stateage); self.outbuf_cnt - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.outbuf_cnt); self.outbuf_msg_cnt - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.outbuf_msg_cnt); self.waitresp_cnt - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.waitresp_cnt); self.waitresp_msg_cnt - .with_label_values(&labels) + .with_guarded_label_values(&labels) .set(stats.waitresp_msg_cnt); - self.tx.with_label_values(&labels).set(stats.tx); - self.tx_bytes.with_label_values(&labels).set(stats.txbytes); - self.tx_errs.with_label_values(&labels).set(stats.txerrs); + self.tx + .with_guarded_label_values(&labels) + .set(stats.tx as i64); + self.tx_bytes + .with_guarded_label_values(&labels) + .set(stats.txbytes as i64); + self.tx_errs + .with_guarded_label_values(&labels) + .set(stats.txerrs as i64); self.tx_retries - .with_label_values(&labels) - .set(stats.txretries); - self.tx_idle.with_label_values(&labels).set(stats.txidle); + .with_guarded_label_values(&labels) + .set(stats.txretries as i64); + self.tx_idle + .with_guarded_label_values(&labels) + .set(stats.txidle); self.req_timeouts - .with_label_values(&labels) - .set(stats.req_timeouts); - self.rx.with_label_values(&labels).set(stats.rx); - self.rx_bytes.with_label_values(&labels).set(stats.rxbytes); - self.rx_errs.with_label_values(&labels).set(stats.rxerrs); + .with_guarded_label_values(&labels) + .set(stats.req_timeouts as i64); + self.rx + .with_guarded_label_values(&labels) + .set(stats.rx as i64); + self.rx_bytes + .with_guarded_label_values(&labels) + .set(stats.rxbytes as i64); + self.rx_errs + .with_guarded_label_values(&labels) + .set(stats.rxerrs as i64); self.rx_corriderrs - .with_label_values(&labels) - .set(stats.rxcorriderrs); + .with_guarded_label_values(&labels) + .set(stats.rxcorriderrs as i64); self.rx_partial - .with_label_values(&labels) - .set(stats.rxpartial); - self.rx_idle.with_label_values(&labels).set(stats.rxidle); + .with_guarded_label_values(&labels) + .set(stats.rxpartial as i64); + self.rx_idle + .with_guarded_label_values(&labels) + .set(stats.rxidle); for (req_type, req_cnt) in &stats.req { self.req - .with_label_values(&[id, client_id, broker, state, req_type]) + .with_guarded_label_values(&[id, client_id, broker, state, req_type]) .set(*req_cnt); } self.zbuf_grow - .with_label_values(&labels) - .set(stats.zbuf_grow); - self.buf_grow.with_label_values(&labels).set(stats.buf_grow); + .with_guarded_label_values(&labels) + .set(stats.zbuf_grow as i64); + self.buf_grow + .with_guarded_label_values(&labels) + .set(stats.buf_grow as i64); if let Some(wakeups) = stats.wakeups { - self.wakeups.with_label_values(&labels).set(wakeups); + self.wakeups + .with_guarded_label_values(&labels) + .set(wakeups as i64); } if let Some(connects) = stats.connects { - self.connects.with_label_values(&labels).set(connects); + self.connects + .with_guarded_label_values(&labels) + .set(connects); } if let Some(disconnects) = stats.disconnects { - self.disconnects.with_label_values(&labels).set(disconnects); + self.disconnects + .with_guarded_label_values(&labels) + .set(disconnects); } if let Some(int_latency) = &stats.int_latency { self.int_latency diff --git a/src/connector/src/source/monitor/metrics.rs b/src/connector/src/source/monitor/metrics.rs index bf43ca44212ae..0b5313dfcda46 100644 --- a/src/connector/src/source/monitor/metrics.rs +++ b/src/connector/src/source/monitor/metrics.rs @@ -14,11 +14,11 @@ use std::sync::{Arc, LazyLock}; -use prometheus::core::{AtomicI64, AtomicU64, GenericCounterVec, GenericGaugeVec}; -use prometheus::{ - exponential_buckets, histogram_opts, register_histogram_vec_with_registry, - register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, HistogramVec, - Registry, +use prometheus::{exponential_buckets, histogram_opts, Registry}; +use risingwave_common::metrics::{ + register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry, + register_guarded_int_gauge_vec_with_registry, LabelGuardedHistogramVec, + LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec, }; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; @@ -26,7 +26,7 @@ use crate::source::kafka::stats::RdKafkaStats; #[derive(Debug, Clone)] pub struct EnumeratorMetrics { - pub high_watermark: GenericGaugeVec, + pub high_watermark: LabelGuardedIntGaugeVec<2>, } pub static GLOBAL_ENUMERATOR_METRICS: LazyLock = @@ -34,7 +34,7 @@ pub static GLOBAL_ENUMERATOR_METRICS: LazyLock = impl EnumeratorMetrics { fn new(registry: &Registry) -> Self { - let high_watermark = register_int_gauge_vec_with_registry!( + let high_watermark = register_guarded_int_gauge_vec_with_registry!( "source_kafka_high_watermark", "High watermark for a exec per partition", &["source_id", "partition"], @@ -57,18 +57,18 @@ impl Default for EnumeratorMetrics { #[derive(Debug, Clone)] pub struct SourceMetrics { - pub partition_input_count: GenericCounterVec, + pub partition_input_count: LabelGuardedIntCounterVec<5>, // **Note**: for normal messages, the metric is the message's payload size. // For messages from load generator, the metric is the size of stream chunk. - pub partition_input_bytes: GenericCounterVec, + pub partition_input_bytes: LabelGuardedIntCounterVec<5>, /// Report latest message id - pub latest_message_id: GenericGaugeVec, + pub latest_message_id: LabelGuardedIntGaugeVec<3>, pub rdkafka_native_metric: Arc, - pub connector_source_rows_received: GenericCounterVec, + pub connector_source_rows_received: LabelGuardedIntCounterVec<2>, - pub direct_cdc_event_lag_latency: HistogramVec, + pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec<1>, } pub static GLOBAL_SOURCE_METRICS: LazyLock = @@ -76,7 +76,7 @@ pub static GLOBAL_SOURCE_METRICS: LazyLock = impl SourceMetrics { fn new(registry: &Registry) -> Self { - let partition_input_count = register_int_counter_vec_with_registry!( + let partition_input_count = register_guarded_int_counter_vec_with_registry!( "source_partition_input_count", "Total number of rows that have been input from specific partition", &[ @@ -89,7 +89,7 @@ impl SourceMetrics { registry ) .unwrap(); - let partition_input_bytes = register_int_counter_vec_with_registry!( + let partition_input_bytes = register_guarded_int_counter_vec_with_registry!( "source_partition_input_bytes", "Total bytes that have been input from specific partition", &[ @@ -102,7 +102,7 @@ impl SourceMetrics { registry ) .unwrap(); - let latest_message_id = register_int_gauge_vec_with_registry!( + let latest_message_id = register_guarded_int_gauge_vec_with_registry!( "source_latest_message_id", "Latest message id for a exec per partition", &["source_id", "actor_id", "partition"], @@ -110,7 +110,7 @@ impl SourceMetrics { ) .unwrap(); - let connector_source_rows_received = register_int_counter_vec_with_registry!( + let connector_source_rows_received = register_guarded_int_counter_vec_with_registry!( "source_rows_received", "Number of rows received by source", &["source_type", "source_id"], @@ -124,7 +124,7 @@ impl SourceMetrics { exponential_buckets(1.0, 2.0, 21).unwrap(), // max 1048s ); let direct_cdc_event_lag_latency = - register_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap(); + register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap(); let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone())); SourceMetrics { diff --git a/src/connector/src/source/nexmark/source/reader.rs b/src/connector/src/source/nexmark/source/reader.rs index 6441baa154ae4..6d01d9223bb46 100644 --- a/src/connector/src/source/nexmark/source/reader.rs +++ b/src/connector/src/source/nexmark/source/reader.rs @@ -121,7 +121,7 @@ impl SplitReader for NexmarkSplitReader { .inspect_ok(move |chunk: &StreamChunk| { metrics .partition_input_count - .with_label_values(&[ + .with_guarded_label_values(&[ &actor_id, &source_id, &split_id, @@ -131,7 +131,7 @@ impl SplitReader for NexmarkSplitReader { .inc_by(chunk.cardinality() as u64); metrics .partition_input_bytes - .with_label_values(&[ + .with_guarded_label_values(&[ &actor_id, &source_id, &split_id,