Skip to content

Commit

Permalink
feat(metrics): introduce slow op histogram
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx committed Jan 5, 2024
1 parent 788d6cd commit 2adeeac
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ use crate::monitor::GLOBAL_METRICS_REGISTRY;
mod error_metrics;
mod guarded_metrics;
mod relabeled_metric;
mod slow_op_histogram;

pub use error_metrics::*;
pub use guarded_metrics::*;
pub use relabeled_metric::*;
pub use slow_op_histogram::*;

#[derive(Debug)]
pub struct TrAdderAtomic(TrAdder<i64>);
Expand Down
57 changes: 56 additions & 1 deletion src/common/src/metrics/guarded_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use prometheus::core::{
};
use prometheus::local::{LocalHistogram, LocalIntCounter};
use prometheus::proto::MetricFamily;
use prometheus::{Gauge, Histogram, HistogramVec, IntCounter, IntGauge};
use prometheus::{Gauge, Histogram, HistogramTimer, HistogramVec, IntCounter, IntGauge};
use thiserror_ext::AsReport;
use tracing::warn;

Expand Down Expand Up @@ -324,6 +324,61 @@ impl<T, const N: usize> Deref for LabelGuardedMetric<T, N> {
}
}

#[must_use = "Timer should be kept in a variable otherwise it cannot observe duration"]
#[derive(Debug)]
pub struct ThresholdHistogramTimer {
histogram: Histogram,
inner: Option<HistogramTimer>,
threshold: f64,
}

impl Deref for ThresholdHistogramTimer {
type Target = HistogramTimer;

fn deref(&self) -> &Self::Target {
self.inner.as_ref().unwrap()
}
}

impl Drop for ThresholdHistogramTimer {
fn drop(&mut self) {
let inner = self.inner.take().unwrap();
let v = inner.stop_and_discard();
if v >= self.threshold {
self.histogram.observe(v);
}
}
}

impl<const N: usize> LabelGuardedHistogram<N> {
pub fn observe_with_threshold(&self, v: f64, threshold: f64) {
if v >= threshold {
self.inner.observe(v);
}
}

pub fn start_timer_with_threshold(&self, threshold: f64) -> ThresholdHistogramTimer {
let histogram = self.inner.clone();
let inner = Some(self.inner.start_timer());
ThresholdHistogramTimer {
histogram,
inner,
threshold,
}
}

/// Observe execution time of a closure, in second.
pub fn observe_closure_duration_with_threshold<F, T>(&self, f: F, threshold: f64) -> T
where
F: FnOnce() -> T,
{
let timer = self.start_timer_with_threshold(threshold);
let res = f();
drop(timer);
res
}
}

impl<const N: usize> LabelGuardedHistogram<N> {
pub fn test_histogram() -> Self {
LabelGuardedHistogramVec::<N>::test_histogram_vec().with_test_label()
Expand Down
120 changes: 120 additions & 0 deletions src/common/src/metrics/slow_op_histogram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

use itertools::Itertools;
use parking_lot::Mutex;
use prometheus::core::Collector;
use prometheus::{HistogramOpts, HistogramVec};

#[derive(Debug, Default)]
pub struct SlowOpHistogramInfo<const N: usize> {
starts: HashMap<Instant, [String; N]>,
to_collect: Vec<([String; N], Duration)>,
labels_count: HashMap<[String; N], usize>,
}

#[derive(Debug, Clone)]
pub struct SlowOpHistogramVec<const N: usize> {
vec: HistogramVec,

threshold: Duration,

info: Arc<Mutex<SlowOpHistogramInfo<N>>>,

_labels: [&'static str; N],
}

impl<const N: usize> SlowOpHistogramVec<N> {
pub fn new(opts: HistogramOpts, labels: &[&'static str; N], threshold: Duration) -> Self {
let vec = HistogramVec::new(opts, labels).unwrap();
Self {
vec,
threshold,
info: Arc::new(Mutex::new(SlowOpHistogramInfo::default())),
_labels: *labels,
}
}

pub fn monitor(&self, label_values: &[&str; N]) -> LabeledSlowOpHistogramVecGuard<N> {
let label_values = label_values.map(|str| str.to_string());

let mut guard = self.info.lock();
*guard.labels_count.entry(label_values).or_default() += 1;

LabeledSlowOpHistogramVecGuard {
owner: self.clone(),
start: Instant::now(),
}
}
}

impl<const N: usize> Collector for SlowOpHistogramVec<N> {
fn desc(&self) -> Vec<&prometheus::core::Desc> {
self.vec.desc()
}

fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
let mut guard = self.info.lock();
let mut to_collect = vec![];
std::mem::swap(&mut guard.to_collect, &mut to_collect);
let dropped = to_collect.len();

for (start, label_values) in &guard.starts {
to_collect.push((label_values.clone(), start.elapsed()));
}
for (index, (label_values, duration)) in to_collect.into_iter().enumerate() {
let label_values_str = label_values.iter().map(|s| s.as_str()).collect_vec();
if duration >= self.threshold {
self.vec
.with_label_values(&label_values_str)
.observe(duration.as_secs_f64());
}
if index < dropped {
let count = guard.labels_count.get_mut(&label_values).unwrap();
*count -= 1;
if *count == 0 {
self.vec
.remove_label_values(&label_values_str)
.expect("should exist");
}
guard
.labels_count
.remove(&label_values)
.expect("should exist");
}
}
drop(guard);

self.vec.collect()
}
}

pub struct LabeledSlowOpHistogramVecGuard<const N: usize> {
owner: SlowOpHistogramVec<N>,
start: Instant,
}

impl<const N: usize> Drop for LabeledSlowOpHistogramVecGuard<N> {
fn drop(&mut self) {
let mut guard = self.owner.info.lock();
if let Some(label_values) = guard.starts.remove(&self.start) {
let duration = self.start.elapsed();
guard.to_collect.push((label_values, duration));
}
}
}

0 comments on commit 2adeeac

Please sign in to comment.