diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 86d29213374a0..a2b1d22f59d9b 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -46,6 +46,7 @@ #![feature(negative_impls)] #![feature(async_fn_in_trait)] #![feature(bound_map)] +#![feature(array_methods)] #[macro_use] pub mod jemalloc; diff --git a/src/common/src/metrics/guarded_metrics.rs b/src/common/src/metrics/guarded_metrics.rs index f6a2ab71627fb..7cc166aa5998b 100644 --- a/src/common/src/metrics/guarded_metrics.rs +++ b/src/common/src/metrics/guarded_metrics.rs @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::type_name; +use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; use std::ops::Deref; use std::sync::Arc; use itertools::Itertools; +use parking_lot::Mutex; use prometheus::core::{ Atomic, AtomicI64, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec, MetricVec, MetricVecBuilder, @@ -42,11 +46,15 @@ pub fn __extract_histogram_builder(vec: HistogramVec) -> MetricVec {{ - let result = prometheus::register_histogram_vec_with_registry!( - prometheus::histogram_opts!($NAME, $HELP), + $crate::register_guarded_histogram_vec_with_registry! { + {prometheus::histogram_opts!($NAME, $HELP)}, $LABELS_NAMES, $REGISTRY - ); + } + }}; + ($HOPTS:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{ + let result = + prometheus::register_histogram_vec_with_registry!($HOPTS, $LABELS_NAMES, $REGISTRY); result.map(|inner| { let inner = $crate::metrics::__extract_histogram_builder(inner); $crate::metrics::LabelGuardedHistogramVec::new(inner, { $LABELS_NAMES }) @@ -94,40 +102,60 @@ pub type LabelGuardedIntCounterVec = pub type LabelGuardedIntGaugeVec = LabelGuardedMetricVec, N>; -pub type LabelGuardedHistogram = LabelGuardedMetric; -pub type LabelGuardedIntCounter = LabelGuardedMetric>; -pub type LabelGuardedIntGauge = LabelGuardedMetric>; +pub type LabelGuardedHistogram = LabelGuardedMetric; +pub type LabelGuardedIntCounter = + LabelGuardedMetric, N>; +pub type LabelGuardedIntGauge = LabelGuardedMetric, N>; fn gen_test_label() -> [&'static str; N] { - vec!["test"; N].try_into().unwrap() + const TEST_LABELS: [&str; 5] = ["test1", "test2", "test3", "test4", "test5"]; + (0..N) + .map(|i| TEST_LABELS[i]) + .collect_vec() + .try_into() + .unwrap() } #[derive(Clone)] pub struct LabelGuardedMetricVec { inner: MetricVec, - _labels: [&'static str; N], + labeled_metrics_count: Arc>>, + labels: [&'static str; N], +} + +impl Debug for LabelGuardedMetricVec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct(format!("LabelGuardedMetricVec<{}, {}>", type_name::(), N).as_str()) + .field("label", &self.labels) + .finish() + } } impl LabelGuardedMetricVec { pub fn new(inner: MetricVec, labels: &[&'static str; N]) -> Self { Self { inner, - _labels: *labels, + labeled_metrics_count: Default::default(), + labels: *labels, } } - pub fn with_label_values(&self, labels: &[&str; N]) -> LabelGuardedMetric { + pub fn with_label_values(&self, labels: &[&str; N]) -> LabelGuardedMetric { + let mut count_guard = self.labeled_metrics_count.lock(); + let label_string = labels.map(|str| str.to_string()); + *count_guard.entry(label_string).or_insert(0) += 1; let inner = self.inner.with_label_values(labels); LabelGuardedMetric { inner: Arc::new(LabelGuardedMetricInner { inner, - labels: labels.iter().map(|s| s.to_string()).collect(), + labels: labels.map(|str| str.to_string()), vec: self.inner.clone(), + labeled_metrics_count: self.labeled_metrics_count.clone(), }), } } - pub fn with_test_label(&self) -> LabelGuardedMetric { + pub fn with_test_label(&self) -> LabelGuardedMetric { let labels: [&'static str; N] = gen_test_label::(); self.with_label_values(&labels) } @@ -173,37 +201,49 @@ impl LabelGuardedHistogramVec { } #[derive(Clone)] -struct LabelGuardedMetricInner { +struct LabelGuardedMetricInner { inner: T::M, - labels: Vec, + labels: [String; N], vec: MetricVec, + labeled_metrics_count: Arc>>, } -impl Drop for LabelGuardedMetricInner { +impl Drop for LabelGuardedMetricInner { fn drop(&mut self) { - if let Err(e) = self.vec.remove_label_values( - self.labels - .iter() - .map(|s| s.as_str()) - .collect_vec() - .as_slice(), - ) { - warn!( - "err when delete metrics of {:?} of labels {:?}. Err {:?}", - self.vec.desc().first().expect("should have desc").fq_name, - self.labels, - e, - ); + let mut count_guard = self.labeled_metrics_count.lock(); + let count = count_guard.get_mut(&self.labels).expect( + "should exist because the current existing dropping one means the count is not zero", + ); + *count -= 1; + if *count == 0 { + count_guard.remove(&self.labels).expect("should exist"); + if let Err(e) = self + .vec + .remove_label_values(&self.labels.each_ref().map(|s| s.as_str())) + { + warn!( + "err when delete metrics of {:?} of labels {:?}. Err {:?}", + self.vec.desc().first().expect("should have desc").fq_name, + self.labels, + e, + ); + } } } } #[derive(Clone)] -pub struct LabelGuardedMetric { - inner: Arc>, +pub struct LabelGuardedMetric { + inner: Arc>, } -impl Deref for LabelGuardedMetric { +impl Debug for LabelGuardedMetric { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LabelGuardedMetric").finish() + } +} + +impl Deref for LabelGuardedMetric { type Target = T::M; fn deref(&self) -> &Self::Target { @@ -211,20 +251,47 @@ impl Deref for LabelGuardedMetric { } } -impl LabelGuardedHistogram { +impl LabelGuardedHistogram { pub fn test_histogram() -> Self { - LabelGuardedHistogramVec::<1>::test_histogram_vec().with_test_label() + LabelGuardedHistogramVec::::test_histogram_vec().with_test_label() } } -impl LabelGuardedIntCounter { +impl LabelGuardedIntCounter { pub fn test_int_counter() -> Self { - LabelGuardedIntCounterVec::<1>::test_int_counter_vec().with_test_label() + LabelGuardedIntCounterVec::::test_int_counter_vec().with_test_label() } } -impl LabelGuardedIntGauge { +impl LabelGuardedIntGauge { pub fn test_int_gauge() -> Self { - LabelGuardedIntGaugeVec::<1>::test_int_gauge_vec().with_test_label() + LabelGuardedIntGaugeVec::::test_int_gauge_vec().with_test_label() + } +} + +#[cfg(test)] +mod tests { + use prometheus::core::Collector; + + use crate::metrics::LabelGuardedIntCounterVec; + + #[test] + fn test_label_guarded_metrics_drop() { + let vec = LabelGuardedIntCounterVec::<3>::test_int_counter_vec(); + let m1_1 = vec.with_label_values(&["1", "2", "3"]); + assert_eq!(1, vec.inner.collect().pop().unwrap().get_metric().len()); + let m1_2 = vec.with_label_values(&["1", "2", "3"]); + let m1_3 = m1_2.clone(); + assert_eq!(1, vec.inner.collect().pop().unwrap().get_metric().len()); + let m2 = vec.with_label_values(&["2", "2", "3"]); + assert_eq!(2, vec.inner.collect().pop().unwrap().get_metric().len()); + drop(m1_3); + assert_eq!(2, vec.inner.collect().pop().unwrap().get_metric().len()); + drop(m2); + assert_eq!(1, vec.inner.collect().pop().unwrap().get_metric().len()); + drop(m1_1); + assert_eq!(1, vec.inner.collect().pop().unwrap().get_metric().len()); + drop(m1_2); + assert_eq!(0, vec.inner.collect().pop().unwrap().get_metric().len()); } } diff --git a/src/common/src/metrics/relabeled_metric.rs b/src/common/src/metrics/relabeled_metric.rs index e620ee30ba844..4eb7fa73665d6 100644 --- a/src/common/src/metrics/relabeled_metric.rs +++ b/src/common/src/metrics/relabeled_metric.rs @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec}; -use prometheus::{Histogram, HistogramVec}; +use prometheus::core::{MetricVec, MetricVecBuilder}; +use prometheus::{HistogramVec, IntCounterVec}; use crate::config::MetricLevel; +use crate::metrics::{ + LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedMetric, LabelGuardedMetricVec, +}; /// For all `Relabeled*Vec` below, /// - when `metric_level` <= `relabel_threshold`, they behaves exactly the same as their inner @@ -28,10 +31,10 @@ use crate::config::MetricLevel; /// than specializing them one by one. However, that's undoable because prometheus crate doesn't /// export `MetricVecBuilder` implementation like `HistogramVecBuilder`. #[derive(Clone, Debug)] -pub struct RelabeledHistogramVec { +pub struct RelabeledMetricVec { relabel_threshold: MetricLevel, metric_level: MetricLevel, - metric: HistogramVec, + metric: M, /// The first `relabel_num` labels will be relabeled to empty string /// @@ -41,10 +44,10 @@ pub struct RelabeledHistogramVec { relabel_num: usize, } -impl RelabeledHistogramVec { +impl RelabeledMetricVec { pub fn with_metric_level( metric_level: MetricLevel, - metric: HistogramVec, + metric: M, relabel_threshold: MetricLevel, ) -> Self { Self { @@ -57,7 +60,7 @@ impl RelabeledHistogramVec { pub fn with_metric_level_relabel_n( metric_level: MetricLevel, - metric: HistogramVec, + metric: M, relabel_threshold: MetricLevel, relabel_num: usize, ) -> Self { @@ -68,8 +71,10 @@ impl RelabeledHistogramVec { relabel_num, } } +} - pub fn with_label_values(&self, vals: &[&str]) -> Histogram { +impl RelabeledMetricVec> { + pub fn with_label_values(&self, vals: &[&str]) -> T::M { if self.metric_level > self.relabel_threshold { // relabel first n labels to empty string let mut relabeled_vals = vals.to_vec(); @@ -82,46 +87,11 @@ impl RelabeledHistogramVec { } } -#[derive(Clone, Debug)] -pub struct RelabeledCounterVec { - relabel_threshold: MetricLevel, - metric_level: MetricLevel, - metric: GenericCounterVec, - relabel_num: usize, -} - -impl RelabeledCounterVec { - pub fn with_metric_level( - metric_level: MetricLevel, - metric: GenericCounterVec, - relabel_threshold: MetricLevel, - ) -> Self { - Self { - relabel_threshold, - metric_level, - metric, - relabel_num: usize::MAX, - } - } - - pub fn with_metric_level_relabel_n( - metric_level: MetricLevel, - metric: GenericCounterVec, - relabel_threshold: MetricLevel, - relabel_num: usize, - ) -> Self { - Self { - relabel_threshold, - metric_level, - metric, - relabel_num, - } - } - - pub fn with_label_values(&self, vals: &[&str]) -> GenericCounter { +impl RelabeledMetricVec> { + pub fn with_label_values(&self, vals: &[&str; N]) -> LabelGuardedMetric { if self.metric_level > self.relabel_threshold { // relabel first n labels to empty string - let mut relabeled_vals = vals.to_vec(); + let mut relabeled_vals = *vals; for label in relabeled_vals.iter_mut().take(self.relabel_num) { *label = ""; } @@ -130,3 +100,11 @@ impl RelabeledCounterVec { self.metric.with_label_values(vals) } } + +pub type RelabeledCounterVec = RelabeledMetricVec; +pub type RelabeledHistogramVec = RelabeledMetricVec; + +pub type RelabeledGuardedHistogramVec = + RelabeledMetricVec>; +pub type RelabeledGuardedIntCounterVec = + RelabeledMetricVec>; diff --git a/src/common/src/monitor/connection.rs b/src/common/src/monitor/connection.rs index 0c153a3931098..4a0242dec1bcd 100644 --- a/src/common/src/monitor/connection.rs +++ b/src/common/src/monitor/connection.rs @@ -27,17 +27,19 @@ use hyper::client::connect::Connection; use hyper::client::HttpConnector; use hyper::service::Service; use pin_project_lite::pin_project; -use prometheus::core::{ - AtomicI64, AtomicU64, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec, -}; -use prometheus::{ - register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, Registry, -}; +use prometheus::Registry; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tonic::transport::{Channel, Endpoint}; use tracing::{info, warn}; +use crate::metrics::{ + LabelGuardedIntCounter, LabelGuardedIntCounterVec, LabelGuardedIntGauge, + LabelGuardedIntGaugeVec, +}; use crate::monitor::GLOBAL_METRICS_REGISTRY; +use crate::{ + register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry, +}; pub trait MonitorAsyncReadWrite { fn on_read(&mut self, _size: usize) {} @@ -274,17 +276,17 @@ where #[derive(Clone)] pub struct ConnectionMetrics { - connection_count: GenericGaugeVec, - connection_create_rate: GenericCounterVec, - connection_err_rate: GenericCounterVec, + connection_count: LabelGuardedIntGaugeVec<2>, + connection_create_rate: LabelGuardedIntCounterVec<2>, + connection_err_rate: LabelGuardedIntCounterVec<2>, - read_rate: GenericCounterVec, - reader_count: GenericGaugeVec, + read_rate: LabelGuardedIntCounterVec<2>, + reader_count: LabelGuardedIntGaugeVec<2>, - write_rate: GenericCounterVec, - writer_count: GenericGaugeVec, + write_rate: LabelGuardedIntCounterVec<2>, + writer_count: LabelGuardedIntGaugeVec<2>, - io_err_rate: GenericCounterVec, + io_err_rate: LabelGuardedIntCounterVec<4>, } pub static GLOBAL_CONNECTION_METRICS: LazyLock = @@ -293,7 +295,7 @@ pub static GLOBAL_CONNECTION_METRICS: LazyLock = impl ConnectionMetrics { pub fn new(registry: &Registry) -> Self { let labels = ["connection_type", "uri"]; - let connection_count = register_int_gauge_vec_with_registry!( + let connection_count = register_guarded_int_gauge_vec_with_registry!( "connection_count", "The number of current existing connection", &labels, @@ -301,7 +303,7 @@ impl ConnectionMetrics { ) .unwrap(); - let connection_create_rate = register_int_counter_vec_with_registry!( + let connection_create_rate = register_guarded_int_counter_vec_with_registry!( "connection_create_rate", "Rate on creating new connection", &labels, @@ -309,7 +311,7 @@ impl ConnectionMetrics { ) .unwrap(); - let connection_err_rate = register_int_counter_vec_with_registry!( + let connection_err_rate = register_guarded_int_counter_vec_with_registry!( "connection_err_rate", "Error rate on creating new connection", &labels, @@ -317,7 +319,7 @@ impl ConnectionMetrics { ) .unwrap(); - let read_rate = register_int_counter_vec_with_registry!( + let read_rate = register_guarded_int_counter_vec_with_registry!( "connection_read_rate", "Read rate of a connection", &labels, @@ -325,7 +327,7 @@ impl ConnectionMetrics { ) .unwrap(); - let reader_count = register_int_gauge_vec_with_registry!( + let reader_count = register_guarded_int_gauge_vec_with_registry!( "connection_reader_count", "The number of current existing reader", &labels, @@ -333,7 +335,7 @@ impl ConnectionMetrics { ) .unwrap(); - let write_rate = register_int_counter_vec_with_registry!( + let write_rate = register_guarded_int_counter_vec_with_registry!( "connection_write_rate", "Write rate of a connection", &labels, @@ -341,7 +343,7 @@ impl ConnectionMetrics { ) .unwrap(); - let writer_count = register_int_gauge_vec_with_registry!( + let writer_count = register_guarded_int_gauge_vec_with_registry!( "connection_writer_count", "The number of current existing writer", &labels, @@ -349,7 +351,7 @@ impl ConnectionMetrics { ) .unwrap(); - let io_err_rate = register_int_counter_vec_with_registry!( + let io_err_rate = register_guarded_int_counter_vec_with_registry!( "connection_io_err_rate", "IO err rate of a connection", &["connection_type", "uri", "op_type", "error_kind"], @@ -564,27 +566,27 @@ pub struct MonitorAsyncReadWriteImpl { connection_type: String, unreported_read_rate: u64, - read_rate: GenericCounter, - reader_count_guard: GenericGauge, + read_rate: LabelGuardedIntCounter<2>, + reader_count_guard: LabelGuardedIntGauge<2>, is_eof: bool, unreported_write_rate: u64, - write_rate: GenericCounter, - writer_count_guard: GenericGauge, + write_rate: LabelGuardedIntCounter<2>, + writer_count_guard: LabelGuardedIntGauge<2>, is_shutdown: bool, - connection_count_guard: GenericGauge, + connection_count_guard: LabelGuardedIntGauge<2>, } impl MonitorAsyncReadWriteImpl { pub fn new( endpoint: String, connection_type: String, - read_rate: GenericCounter, - reader_count: GenericGauge, - write_rate: GenericCounter, - writer_count: GenericGauge, - connection_count: GenericGauge, + read_rate: LabelGuardedIntCounter<2>, + reader_count: LabelGuardedIntGauge<2>, + write_rate: LabelGuardedIntCounter<2>, + writer_count: LabelGuardedIntGauge<2>, + connection_count: LabelGuardedIntGauge<2>, ) -> Self { reader_count.inc(); writer_count.inc(); diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 317f1042b263d..053467d9f125d 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -216,13 +216,13 @@ impl From for SinkParam { #[derive(Clone)] pub struct SinkMetrics { - pub sink_commit_duration_metrics: LabelGuardedHistogram, - pub connector_sink_rows_received: LabelGuardedIntCounter, - pub log_store_first_write_epoch: LabelGuardedIntGauge, - pub log_store_latest_write_epoch: LabelGuardedIntGauge, - pub log_store_write_rows: LabelGuardedIntCounter, - pub log_store_latest_read_epoch: LabelGuardedIntGauge, - pub log_store_read_rows: LabelGuardedIntCounter, + pub sink_commit_duration_metrics: LabelGuardedHistogram<3>, + pub connector_sink_rows_received: LabelGuardedIntCounter<2>, + pub log_store_first_write_epoch: LabelGuardedIntGauge<3>, + pub log_store_latest_write_epoch: LabelGuardedIntGauge<3>, + pub log_store_write_rows: LabelGuardedIntCounter<3>, + pub log_store_latest_read_epoch: LabelGuardedIntGauge<3>, + pub log_store_read_rows: LabelGuardedIntCounter<3>, } impl SinkMetrics { diff --git a/src/storage/src/monitor/hummock_state_store_metrics.rs b/src/storage/src/monitor/hummock_state_store_metrics.rs index 2e114ac766de1..1b4894256f11c 100644 --- a/src/storage/src/monitor/hummock_state_store_metrics.rs +++ b/src/storage/src/monitor/hummock_state_store_metrics.rs @@ -21,8 +21,14 @@ use prometheus::{ register_int_gauge_with_registry, Gauge, IntGauge, IntGaugeVec, Opts, Registry, }; use risingwave_common::config::MetricLevel; -use risingwave_common::metrics::{RelabeledCounterVec, RelabeledHistogramVec}; +use risingwave_common::metrics::{ + RelabeledCounterVec, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, + RelabeledHistogramVec, +}; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; +use risingwave_common::{ + register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry, +}; use tracing::warn; /// [`HummockStateStoreMetrics`] stores the performance and IO metrics of `XXXStore` such as @@ -35,17 +41,17 @@ pub struct HummockStateStoreMetrics { pub bloom_filter_true_negative_counts: RelabeledCounterVec, pub bloom_filter_check_counts: RelabeledCounterVec, pub iter_merge_sstable_counts: RelabeledHistogramVec, - pub sst_store_block_request_counts: RelabeledCounterVec, - pub iter_scan_key_counts: RelabeledCounterVec, + pub sst_store_block_request_counts: RelabeledGuardedIntCounterVec<2>, + pub iter_scan_key_counts: RelabeledGuardedIntCounterVec<2>, pub get_shared_buffer_hit_counts: RelabeledCounterVec, pub remote_read_time: RelabeledHistogramVec, - pub iter_fetch_meta_duration: RelabeledHistogramVec, + pub iter_fetch_meta_duration: RelabeledGuardedHistogramVec<1>, pub iter_fetch_meta_cache_unhits: IntGauge, pub iter_slow_fetch_meta_cache_unhits: IntGauge, - pub read_req_bloom_filter_positive_counts: RelabeledCounterVec, - pub read_req_positive_but_non_exist_counts: RelabeledCounterVec, - pub read_req_check_bloom_filter_counts: RelabeledCounterVec, + pub read_req_bloom_filter_positive_counts: RelabeledGuardedIntCounterVec<2>, + pub read_req_positive_but_non_exist_counts: RelabeledGuardedIntCounterVec<2>, + pub read_req_check_bloom_filter_counts: RelabeledGuardedIntCounterVec<2>, pub write_batch_tuple_counts: RelabeledCounterVec, pub write_batch_duration: RelabeledHistogramVec, @@ -130,27 +136,27 @@ impl HummockStateStoreMetrics { ); // ----- sst store ----- - let sst_store_block_request_counts = register_int_counter_vec_with_registry!( + let sst_store_block_request_counts = register_guarded_int_counter_vec_with_registry!( "state_store_sst_store_block_request_counts", "Total number of sst block requests that have been issued to sst store", &["table_id", "type"], registry ) .unwrap(); - let sst_store_block_request_counts = RelabeledCounterVec::with_metric_level( + let sst_store_block_request_counts = RelabeledGuardedIntCounterVec::with_metric_level( MetricLevel::Critical, sst_store_block_request_counts, metric_level, ); - let iter_scan_key_counts = register_int_counter_vec_with_registry!( + let iter_scan_key_counts = register_guarded_int_counter_vec_with_registry!( "state_store_iter_scan_key_counts", "Total number of keys read by iterator", &["table_id", "type"], registry ) .unwrap(); - let iter_scan_key_counts = RelabeledCounterVec::with_metric_level( + let iter_scan_key_counts = RelabeledGuardedIntCounterVec::with_metric_level( MetricLevel::Info, iter_scan_key_counts, metric_level, @@ -188,8 +194,8 @@ impl HummockStateStoreMetrics { state_store_read_time_buckets.clone(), ); let iter_fetch_meta_duration = - register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap(); - let iter_fetch_meta_duration = RelabeledHistogramVec::with_metric_level( + register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap(); + let iter_fetch_meta_duration = RelabeledGuardedHistogramVec::with_metric_level( MetricLevel::Info, iter_fetch_meta_duration, metric_level, @@ -310,33 +316,35 @@ impl HummockStateStoreMetrics { .register(Box::new(uploader_uploading_task_size.clone())) .unwrap(); - let read_req_bloom_filter_positive_counts = register_int_counter_vec_with_registry!( + let read_req_bloom_filter_positive_counts = register_guarded_int_counter_vec_with_registry!( "state_store_read_req_bloom_filter_positive_counts", "Total number of read request with at least one SST bloom filter check returns positive", &["table_id", "type"], registry ) .unwrap(); - let read_req_bloom_filter_positive_counts = RelabeledCounterVec::with_metric_level( - MetricLevel::Info, - read_req_bloom_filter_positive_counts, - metric_level, - ); + let read_req_bloom_filter_positive_counts = + RelabeledGuardedIntCounterVec::with_metric_level( + MetricLevel::Info, + read_req_bloom_filter_positive_counts, + metric_level, + ); - let read_req_positive_but_non_exist_counts = register_int_counter_vec_with_registry!( + let read_req_positive_but_non_exist_counts = register_guarded_int_counter_vec_with_registry!( "state_store_read_req_positive_but_non_exist_counts", "Total number of read request on non-existent key/prefix with at least one SST bloom filter check returns positive", &["table_id", "type"], registry ) .unwrap(); - let read_req_positive_but_non_exist_counts = RelabeledCounterVec::with_metric_level( - MetricLevel::Info, - read_req_positive_but_non_exist_counts, - metric_level, - ); + let read_req_positive_but_non_exist_counts = + RelabeledGuardedIntCounterVec::with_metric_level( + MetricLevel::Info, + read_req_positive_but_non_exist_counts, + metric_level, + ); - let read_req_check_bloom_filter_counts = register_int_counter_vec_with_registry!( + let read_req_check_bloom_filter_counts = register_guarded_int_counter_vec_with_registry!( "state_store_read_req_check_bloom_filter_counts", "Total number of read request that checks bloom filter with a prefix hint", &["table_id", "type"], @@ -344,7 +352,7 @@ impl HummockStateStoreMetrics { ) .unwrap(); - let read_req_check_bloom_filter_counts = RelabeledCounterVec::with_metric_level( + let read_req_check_bloom_filter_counts = RelabeledGuardedIntCounterVec::with_metric_level( MetricLevel::Info, read_req_check_bloom_filter_counts, metric_level, diff --git a/src/storage/src/monitor/monitored_storage_metrics.rs b/src/storage/src/monitor/monitored_storage_metrics.rs index bf2bb181df1fc..1a33a8bcb6ac1 100644 --- a/src/storage/src/monitor/monitored_storage_metrics.rs +++ b/src/storage/src/monitor/monitored_storage_metrics.rs @@ -19,20 +19,23 @@ use prometheus::{ register_histogram_with_registry, register_int_counter_vec_with_registry, Histogram, Registry, }; use risingwave_common::config::MetricLevel; -use risingwave_common::metrics::{RelabeledCounterVec, RelabeledHistogramVec}; +use risingwave_common::metrics::{ + RelabeledCounterVec, RelabeledGuardedHistogramVec, RelabeledHistogramVec, +}; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; +use risingwave_common::register_guarded_histogram_vec_with_registry; /// [`MonitoredStorageMetrics`] stores the performance and IO metrics of Storage. #[derive(Debug, Clone)] pub struct MonitoredStorageMetrics { - pub get_duration: RelabeledHistogramVec, + pub get_duration: RelabeledGuardedHistogramVec<1>, pub get_key_size: RelabeledHistogramVec, pub get_value_size: RelabeledHistogramVec, pub iter_size: RelabeledHistogramVec, pub iter_item: RelabeledHistogramVec, - pub iter_init_duration: RelabeledHistogramVec, - pub iter_scan_duration: RelabeledHistogramVec, + pub iter_init_duration: RelabeledGuardedHistogramVec<1>, + pub iter_scan_duration: RelabeledGuardedHistogramVec<1>, pub may_exist_duration: RelabeledHistogramVec, pub iter_in_process_counts: RelabeledCounterVec, @@ -97,10 +100,13 @@ impl MonitoredStorageMetrics { "Total latency of get that have been issued to state store", state_store_read_time_buckets.clone(), ); - let get_duration = - register_histogram_vec_with_registry!(get_duration_opts, &["table_id"], registry) - .unwrap(); - let get_duration = RelabeledHistogramVec::with_metric_level( + let get_duration = register_guarded_histogram_vec_with_registry!( + get_duration_opts, + &["table_id"], + registry + ) + .unwrap(); + let get_duration = RelabeledGuardedHistogramVec::with_metric_level( MetricLevel::Critical, get_duration, metric_level, @@ -132,8 +138,8 @@ impl MonitoredStorageMetrics { state_store_read_time_buckets.clone(), ); let iter_init_duration = - register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap(); - let iter_init_duration = RelabeledHistogramVec::with_metric_level( + register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap(); + let iter_init_duration = RelabeledGuardedHistogramVec::with_metric_level( MetricLevel::Critical, iter_init_duration, metric_level, @@ -145,8 +151,8 @@ impl MonitoredStorageMetrics { state_store_read_time_buckets.clone(), ); let iter_scan_duration = - register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap(); - let iter_scan_duration = RelabeledHistogramVec::with_metric_level( + register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap(); + let iter_scan_duration = RelabeledGuardedHistogramVec::with_metric_level( MetricLevel::Critical, iter_scan_duration, metric_level, diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 3f30b3753b37c..4256da4ca9325 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -47,8 +47,8 @@ type ReaderTruncationOffsetType = (u64, Option); #[derive(Clone)] pub(crate) struct KvLogStoreReadMetrics { - pub storage_read_count: LabelGuardedIntCounter, - pub storage_read_size: LabelGuardedIntCounter, + pub storage_read_count: LabelGuardedIntCounter<4>, + pub storage_read_size: LabelGuardedIntCounter<4>, } impl KvLogStoreReadMetrics { @@ -63,8 +63,8 @@ impl KvLogStoreReadMetrics { #[derive(Clone)] pub(crate) struct KvLogStoreMetrics { - pub storage_write_count: LabelGuardedIntCounter, - pub storage_write_size: LabelGuardedIntCounter, + pub storage_write_count: LabelGuardedIntCounter<3>, + pub storage_write_size: LabelGuardedIntCounter<3>, pub persistent_log_read_metrics: KvLogStoreReadMetrics, pub flushed_buffer_read_metrics: KvLogStoreReadMetrics, } diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs index d06360f10b80d..459e9cbaa439c 100644 --- a/src/stream/src/executor/monitor/streaming_stats.rs +++ b/src/stream/src/executor/monitor/streaming_stats.rs @@ -17,15 +17,14 @@ use std::sync::OnceLock; use prometheus::core::{AtomicF64, AtomicI64, AtomicU64, GenericCounterVec, GenericGaugeVec}; use prometheus::{ exponential_buckets, histogram_opts, register_gauge_vec_with_registry, - register_histogram_vec_with_registry, register_histogram_with_registry, - register_int_counter_vec_with_registry, register_int_counter_with_registry, - register_int_gauge_vec_with_registry, register_int_gauge_with_registry, Histogram, IntCounter, - IntGauge, Registry, + register_histogram_with_registry, register_int_counter_vec_with_registry, + register_int_counter_with_registry, register_int_gauge_vec_with_registry, + register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, Registry, }; use risingwave_common::config::MetricLevel; use risingwave_common::metrics::{ LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec, - RelabeledHistogramVec, + RelabeledGuardedHistogramVec, }; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use risingwave_common::{ @@ -43,8 +42,8 @@ pub struct StreamingMetrics { // Streaming actor metrics from tokio (disabled by default) pub actor_execution_time: GenericGaugeVec, - pub actor_output_buffer_blocking_duration_ns: GenericCounterVec, - pub actor_input_buffer_blocking_duration_ns: GenericCounterVec, + pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounterVec<3>, + pub actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounterVec<3>, pub actor_scheduled_duration: GenericGaugeVec, pub actor_scheduled_cnt: GenericGaugeVec, pub actor_fast_poll_duration: GenericGaugeVec, @@ -58,8 +57,8 @@ pub struct StreamingMetrics { // Streaming actor pub actor_memory_usage: GenericGaugeVec, - pub actor_in_record_cnt: GenericCounterVec, - pub actor_out_record_cnt: GenericCounterVec, + pub actor_in_record_cnt: LabelGuardedIntCounterVec<2>, + pub actor_out_record_cnt: LabelGuardedIntCounterVec<2>, // Source pub source_output_row_count: GenericCounterVec, @@ -74,14 +73,14 @@ pub struct StreamingMetrics { pub exchange_frag_recv_size: GenericCounterVec, // Streaming Join - pub join_lookup_miss_count: GenericCounterVec, - pub join_lookup_total_count: GenericCounterVec, - pub join_insert_cache_miss_count: GenericCounterVec, - pub join_actor_input_waiting_duration_ns: GenericCounterVec, - pub join_match_duration_ns: GenericCounterVec, - pub join_barrier_align_duration: RelabeledHistogramVec, - pub join_cached_entry_count: GenericGaugeVec, - pub join_matched_join_keys: RelabeledHistogramVec, + pub join_lookup_miss_count: LabelGuardedIntCounterVec<5>, + pub join_lookup_total_count: LabelGuardedIntCounterVec<5>, + pub join_insert_cache_miss_count: LabelGuardedIntCounterVec<5>, + pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec<2>, + pub join_match_duration_ns: LabelGuardedIntCounterVec<3>, + pub join_barrier_align_duration: RelabeledGuardedHistogramVec<3>, + pub join_cached_entry_count: LabelGuardedIntGaugeVec<3>, + pub join_matched_join_keys: RelabeledGuardedHistogramVec<3>, // Streaming Aggregation pub agg_lookup_miss_count: GenericCounterVec, @@ -238,21 +237,23 @@ impl StreamingMetrics { ) .unwrap(); - let actor_output_buffer_blocking_duration_ns = register_int_counter_vec_with_registry!( - "stream_actor_output_buffer_blocking_duration_ns", - "Total blocking duration (ns) of output buffer", - &["actor_id", "fragment_id", "downstream_fragment_id"], - registry - ) - .unwrap(); + let actor_output_buffer_blocking_duration_ns = + register_guarded_int_counter_vec_with_registry!( + "stream_actor_output_buffer_blocking_duration_ns", + "Total blocking duration (ns) of output buffer", + &["actor_id", "fragment_id", "downstream_fragment_id"], + registry + ) + .unwrap(); - let actor_input_buffer_blocking_duration_ns = register_int_counter_vec_with_registry!( - "stream_actor_input_buffer_blocking_duration_ns", - "Total blocking duration (ns) of input buffer", - &["actor_id", "fragment_id", "upstream_fragment_id"], - registry - ) - .unwrap(); + let actor_input_buffer_blocking_duration_ns = + register_guarded_int_counter_vec_with_registry!( + "stream_actor_input_buffer_blocking_duration_ns", + "Total blocking duration (ns) of input buffer", + &["actor_id", "fragment_id", "upstream_fragment_id"], + registry + ) + .unwrap(); let exchange_frag_recv_size = register_int_counter_vec_with_registry!( "stream_exchange_frag_recv_size", @@ -342,7 +343,7 @@ impl StreamingMetrics { ) .unwrap(); - let actor_in_record_cnt = register_int_counter_vec_with_registry!( + let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!( "stream_actor_in_record_cnt", "Total number of rows actor received", &["actor_id", "fragment_id"], @@ -350,7 +351,7 @@ impl StreamingMetrics { ) .unwrap(); - let actor_out_record_cnt = register_int_counter_vec_with_registry!( + let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!( "stream_actor_out_record_cnt", "Total number of rows actor sent", &["actor_id", "fragment_id"], @@ -366,7 +367,7 @@ impl StreamingMetrics { ) .unwrap(); - let join_lookup_miss_count = register_int_counter_vec_with_registry!( + let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!( "stream_join_lookup_miss_count", "Join executor lookup miss duration", &[ @@ -380,7 +381,7 @@ impl StreamingMetrics { ) .unwrap(); - let join_lookup_total_count = register_int_counter_vec_with_registry!( + let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!( "stream_join_lookup_total_count", "Join executor lookup total operation", &[ @@ -394,7 +395,7 @@ impl StreamingMetrics { ) .unwrap(); - let join_insert_cache_miss_count = register_int_counter_vec_with_registry!( + let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!( "stream_join_insert_cache_miss_count", "Join executor cache miss when insert operation", &[ @@ -408,7 +409,7 @@ impl StreamingMetrics { ) .unwrap(); - let join_actor_input_waiting_duration_ns = register_int_counter_vec_with_registry!( + let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!( "stream_join_actor_input_waiting_duration_ns", "Total waiting duration (ns) of input buffer of join actor", &["actor_id", "fragment_id"], @@ -416,7 +417,7 @@ impl StreamingMetrics { ) .unwrap(); - let join_match_duration_ns = register_int_counter_vec_with_registry!( + let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!( "stream_join_match_duration_ns", "Matching duration for each side", &["actor_id", "fragment_id", "side"], @@ -429,21 +430,21 @@ impl StreamingMetrics { "Duration of join align barrier", exponential_buckets(0.0001, 2.0, 21).unwrap() // max 104s ); - let join_barrier_align_duration = register_histogram_vec_with_registry!( + let join_barrier_align_duration = register_guarded_histogram_vec_with_registry!( opts, &["actor_id", "fragment_id", "wait_side"], registry ) .unwrap(); - let join_barrier_align_duration = RelabeledHistogramVec::with_metric_level_relabel_n( + let join_barrier_align_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n( MetricLevel::Debug, join_barrier_align_duration, level, 1, ); - let join_cached_entry_count = register_int_gauge_vec_with_registry!( + let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!( "stream_join_cached_entry_count", "Number of cached entries in streaming join operators", &["actor_id", "fragment_id", "side"], @@ -457,14 +458,14 @@ impl StreamingMetrics { exponential_buckets(16.0, 2.0, 28).unwrap() // max 2^31 ); - let join_matched_join_keys = register_histogram_vec_with_registry!( + let join_matched_join_keys = register_guarded_histogram_vec_with_registry!( join_matched_join_keys_opts, &["actor_id", "fragment_id", "table_id"], registry ) .unwrap(); - let join_matched_join_keys = RelabeledHistogramVec::with_metric_level_relabel_n( + let join_matched_join_keys = RelabeledGuardedHistogramVec::with_metric_level_relabel_n( MetricLevel::Debug, join_matched_join_keys, level,