Skip to content

Commit

Permalink
feat(metrics): support reference counting in metrics label
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Oct 16, 2023
1 parent 7d54cdd commit 8548bbc
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 218 deletions.
1 change: 1 addition & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#![feature(negative_impls)]
#![feature(async_fn_in_trait)]
#![feature(bound_map)]
#![feature(array_methods)]

#[macro_use]
pub mod jemalloc;
Expand Down
141 changes: 104 additions & 37 deletions src/common/src/metrics/guarded_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::type_name;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::ops::Deref;
use std::sync::Arc;

use itertools::Itertools;
use parking_lot::Mutex;
use prometheus::core::{
Atomic, AtomicI64, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge,
GenericGaugeVec, MetricVec, MetricVecBuilder,
Expand All @@ -42,11 +46,15 @@ pub fn __extract_histogram_builder(vec: HistogramVec) -> MetricVec<VecBuilderOfH
#[macro_export]
macro_rules! register_guarded_histogram_vec_with_registry {
($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
let result = prometheus::register_histogram_vec_with_registry!(
prometheus::histogram_opts!($NAME, $HELP),
$crate::register_guarded_histogram_vec_with_registry! {
{prometheus::histogram_opts!($NAME, $HELP)},
$LABELS_NAMES,
$REGISTRY
);
}
}};
($HOPTS:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
let result =
prometheus::register_histogram_vec_with_registry!($HOPTS, $LABELS_NAMES, $REGISTRY);
result.map(|inner| {
let inner = $crate::metrics::__extract_histogram_builder(inner);
$crate::metrics::LabelGuardedHistogramVec::new(inner, { $LABELS_NAMES })
Expand Down Expand Up @@ -94,40 +102,60 @@ pub type LabelGuardedIntCounterVec<const N: usize> =
pub type LabelGuardedIntGaugeVec<const N: usize> =
LabelGuardedMetricVec<VecBuilderOfGauge<AtomicI64>, N>;

pub type LabelGuardedHistogram = LabelGuardedMetric<VecBuilderOfHistogram>;
pub type LabelGuardedIntCounter = LabelGuardedMetric<VecBuilderOfCounter<AtomicU64>>;
pub type LabelGuardedIntGauge = LabelGuardedMetric<VecBuilderOfGauge<AtomicI64>>;
pub type LabelGuardedHistogram<const N: usize> = LabelGuardedMetric<VecBuilderOfHistogram, N>;
pub type LabelGuardedIntCounter<const N: usize> =
LabelGuardedMetric<VecBuilderOfCounter<AtomicU64>, N>;
pub type LabelGuardedIntGauge<const N: usize> = LabelGuardedMetric<VecBuilderOfGauge<AtomicI64>, N>;

fn gen_test_label<const N: usize>() -> [&'static str; N] {
vec!["test"; N].try_into().unwrap()
const TEST_LABELS: [&str; 5] = ["test1", "test2", "test3", "test4", "test5"];
(0..N)
.map(|i| TEST_LABELS[i])
.collect_vec()
.try_into()
.unwrap()
}

#[derive(Clone)]
pub struct LabelGuardedMetricVec<T: MetricVecBuilder, const N: usize> {
inner: MetricVec<T>,
_labels: [&'static str; N],
labeled_metrics_count: Arc<Mutex<HashMap<[String; N], usize>>>,
labels: [&'static str; N],
}

impl<T: MetricVecBuilder, const N: usize> Debug for LabelGuardedMetricVec<T, N> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct(format!("LabelGuardedMetricVec<{}, {}>", type_name::<T>(), N).as_str())
.field("label", &self.labels)
.finish()
}
}

impl<T: MetricVecBuilder, const N: usize> LabelGuardedMetricVec<T, N> {
pub fn new(inner: MetricVec<T>, labels: &[&'static str; N]) -> Self {
Self {
inner,
_labels: *labels,
labeled_metrics_count: Default::default(),
labels: *labels,
}
}

pub fn with_label_values(&self, labels: &[&str; N]) -> LabelGuardedMetric<T> {
pub fn with_label_values(&self, labels: &[&str; N]) -> LabelGuardedMetric<T, N> {
let mut count_guard = self.labeled_metrics_count.lock();
let label_string = labels.map(|str| str.to_string());
*count_guard.entry(label_string).or_insert(0) += 1;
let inner = self.inner.with_label_values(labels);
LabelGuardedMetric {
inner: Arc::new(LabelGuardedMetricInner {
inner,
labels: labels.iter().map(|s| s.to_string()).collect(),
labels: labels.map(|str| str.to_string()),
vec: self.inner.clone(),
labeled_metrics_count: self.labeled_metrics_count.clone(),
}),
}
}

pub fn with_test_label(&self) -> LabelGuardedMetric<T> {
pub fn with_test_label(&self) -> LabelGuardedMetric<T, N> {
let labels: [&'static str; N] = gen_test_label::<N>();
self.with_label_values(&labels)
}
Expand Down Expand Up @@ -173,58 +201,97 @@ impl<const N: usize> LabelGuardedHistogramVec<N> {
}

#[derive(Clone)]
struct LabelGuardedMetricInner<T: MetricVecBuilder> {
struct LabelGuardedMetricInner<T: MetricVecBuilder, const N: usize> {
inner: T::M,
labels: Vec<String>,
labels: [String; N],
vec: MetricVec<T>,
labeled_metrics_count: Arc<Mutex<HashMap<[String; N], usize>>>,
}

impl<T: MetricVecBuilder> Drop for LabelGuardedMetricInner<T> {
impl<T: MetricVecBuilder, const N: usize> Drop for LabelGuardedMetricInner<T, N> {
fn drop(&mut self) {
if let Err(e) = self.vec.remove_label_values(
self.labels
.iter()
.map(|s| s.as_str())
.collect_vec()
.as_slice(),
) {
warn!(
"err when delete metrics of {:?} of labels {:?}. Err {:?}",
self.vec.desc().first().expect("should have desc").fq_name,
self.labels,
e,
);
let mut count_guard = self.labeled_metrics_count.lock();
let count = count_guard.get_mut(&self.labels).expect(
"should exist because the current existing dropping one means the count is not zero",
);
*count -= 1;
if *count == 0 {
count_guard.remove(&self.labels).expect("should exist");
if let Err(e) = self
.vec
.remove_label_values(&self.labels.each_ref().map(|s| s.as_str()))
{
warn!(
"err when delete metrics of {:?} of labels {:?}. Err {:?}",
self.vec.desc().first().expect("should have desc").fq_name,
self.labels,
e,
);
}
}
}
}

#[derive(Clone)]
pub struct LabelGuardedMetric<T: MetricVecBuilder> {
inner: Arc<LabelGuardedMetricInner<T>>,
pub struct LabelGuardedMetric<T: MetricVecBuilder, const N: usize> {
inner: Arc<LabelGuardedMetricInner<T, N>>,
}

impl<T: MetricVecBuilder> Deref for LabelGuardedMetric<T> {
impl<T: MetricVecBuilder, const N: usize> Debug for LabelGuardedMetric<T, N> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LabelGuardedMetric").finish()
}
}

impl<T: MetricVecBuilder, const N: usize> Deref for LabelGuardedMetric<T, N> {
type Target = T::M;

fn deref(&self) -> &Self::Target {
&self.inner.inner
}
}

impl LabelGuardedHistogram {
impl<const N: usize> LabelGuardedHistogram<N> {
pub fn test_histogram() -> Self {
LabelGuardedHistogramVec::<1>::test_histogram_vec().with_test_label()
LabelGuardedHistogramVec::<N>::test_histogram_vec().with_test_label()
}
}

impl LabelGuardedIntCounter {
impl<const N: usize> LabelGuardedIntCounter<N> {
pub fn test_int_counter() -> Self {
LabelGuardedIntCounterVec::<1>::test_int_counter_vec().with_test_label()
LabelGuardedIntCounterVec::<N>::test_int_counter_vec().with_test_label()
}
}

impl LabelGuardedIntGauge {
impl<const N: usize> LabelGuardedIntGauge<N> {
pub fn test_int_gauge() -> Self {
LabelGuardedIntGaugeVec::<1>::test_int_gauge_vec().with_test_label()
LabelGuardedIntGaugeVec::<N>::test_int_gauge_vec().with_test_label()
}
}

#[cfg(test)]
mod tests {
use prometheus::core::Collector;

use crate::metrics::LabelGuardedIntCounterVec;

#[test]
fn test_label_guarded_metrics_drop() {
let vec = LabelGuardedIntCounterVec::<3>::test_int_counter_vec();
let m1_1 = vec.with_label_values(&["1", "2", "3"]);
assert_eq!(1, vec.inner.collect().pop().unwrap().get_metric().len());
let m1_2 = vec.with_label_values(&["1", "2", "3"]);
let m1_3 = m1_2.clone();
assert_eq!(1, vec.inner.collect().pop().unwrap().get_metric().len());
let m2 = vec.with_label_values(&["2", "2", "3"]);
assert_eq!(2, vec.inner.collect().pop().unwrap().get_metric().len());
drop(m1_3);
assert_eq!(2, vec.inner.collect().pop().unwrap().get_metric().len());
drop(m2);
assert_eq!(1, vec.inner.collect().pop().unwrap().get_metric().len());
drop(m1_1);
assert_eq!(1, vec.inner.collect().pop().unwrap().get_metric().len());
drop(m1_2);
assert_eq!(0, vec.inner.collect().pop().unwrap().get_metric().len());
}
}
70 changes: 24 additions & 46 deletions src/common/src/metrics/relabeled_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec};
use prometheus::{Histogram, HistogramVec};
use prometheus::core::{MetricVec, MetricVecBuilder};
use prometheus::{HistogramVec, IntCounterVec};

use crate::config::MetricLevel;
use crate::metrics::{
LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedMetric, LabelGuardedMetricVec,
};

/// For all `Relabeled*Vec` below,
/// - when `metric_level` <= `relabel_threshold`, they behaves exactly the same as their inner
Expand All @@ -28,10 +31,10 @@ use crate::config::MetricLevel;
/// than specializing them one by one. However, that's undoable because prometheus crate doesn't
/// export `MetricVecBuilder` implementation like `HistogramVecBuilder`.
#[derive(Clone, Debug)]
pub struct RelabeledHistogramVec {
pub struct RelabeledMetricVec<M> {
relabel_threshold: MetricLevel,
metric_level: MetricLevel,
metric: HistogramVec,
metric: M,

/// The first `relabel_num` labels will be relabeled to empty string
///
Expand All @@ -41,10 +44,10 @@ pub struct RelabeledHistogramVec {
relabel_num: usize,
}

impl RelabeledHistogramVec {
impl<M> RelabeledMetricVec<M> {
pub fn with_metric_level(
metric_level: MetricLevel,
metric: HistogramVec,
metric: M,
relabel_threshold: MetricLevel,
) -> Self {
Self {
Expand All @@ -57,7 +60,7 @@ impl RelabeledHistogramVec {

pub fn with_metric_level_relabel_n(
metric_level: MetricLevel,
metric: HistogramVec,
metric: M,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> Self {
Expand All @@ -68,8 +71,10 @@ impl RelabeledHistogramVec {
relabel_num,
}
}
}

pub fn with_label_values(&self, vals: &[&str]) -> Histogram {
impl<T: MetricVecBuilder> RelabeledMetricVec<MetricVec<T>> {
pub fn with_label_values(&self, vals: &[&str]) -> T::M {
if self.metric_level > self.relabel_threshold {
// relabel first n labels to empty string
let mut relabeled_vals = vals.to_vec();
Expand All @@ -82,46 +87,11 @@ impl RelabeledHistogramVec {
}
}

#[derive(Clone, Debug)]
pub struct RelabeledCounterVec {
relabel_threshold: MetricLevel,
metric_level: MetricLevel,
metric: GenericCounterVec<AtomicU64>,
relabel_num: usize,
}

impl RelabeledCounterVec {
pub fn with_metric_level(
metric_level: MetricLevel,
metric: GenericCounterVec<AtomicU64>,
relabel_threshold: MetricLevel,
) -> Self {
Self {
relabel_threshold,
metric_level,
metric,
relabel_num: usize::MAX,
}
}

pub fn with_metric_level_relabel_n(
metric_level: MetricLevel,
metric: GenericCounterVec<AtomicU64>,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> Self {
Self {
relabel_threshold,
metric_level,
metric,
relabel_num,
}
}

pub fn with_label_values(&self, vals: &[&str]) -> GenericCounter<AtomicU64> {
impl<T: MetricVecBuilder, const N: usize> RelabeledMetricVec<LabelGuardedMetricVec<T, N>> {
pub fn with_label_values(&self, vals: &[&str; N]) -> LabelGuardedMetric<T, N> {
if self.metric_level > self.relabel_threshold {
// relabel first n labels to empty string
let mut relabeled_vals = vals.to_vec();
let mut relabeled_vals = *vals;
for label in relabeled_vals.iter_mut().take(self.relabel_num) {
*label = "";
}
Expand All @@ -130,3 +100,11 @@ impl RelabeledCounterVec {
self.metric.with_label_values(vals)
}
}

pub type RelabeledCounterVec = RelabeledMetricVec<IntCounterVec>;
pub type RelabeledHistogramVec = RelabeledMetricVec<HistogramVec>;

pub type RelabeledGuardedHistogramVec<const N: usize> =
RelabeledMetricVec<LabelGuardedHistogramVec<N>>;
pub type RelabeledGuardedIntCounterVec<const N: usize> =
RelabeledMetricVec<LabelGuardedIntCounterVec<N>>;
Loading

0 comments on commit 8548bbc

Please sign in to comment.