Skip to content

Commit

Permalink
refactor(metrics): refine relabelled metrics interfaces (#18331)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Aug 30, 2024
1 parent 29db1d9 commit 5055cc9
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 59 deletions.
44 changes: 42 additions & 2 deletions src/common/metrics/src/relabeled_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,47 @@ impl<M> RelabeledMetricVec<M> {
}
}

#[easy_ext::ext(MetricVecRelabelExt)]
impl<M> M
where
M: Sized,
{
/// Equivalent to [`RelabeledMetricVec::with_metric_level`].
pub fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M> {
RelabeledMetricVec::with_metric_level(metric_level, self, relabel_threshold)
}

/// Equivalent to [`RelabeledMetricVec::with_metric_level_relabel_n`].
pub fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M> {
RelabeledMetricVec::with_metric_level_relabel_n(
metric_level,
self,
relabel_threshold,
relabel_num,
)
}

/// Equivalent to [`RelabeledMetricVec::with_metric_level_relabel_n`] with `metric_level` set to
/// `MetricLevel::Debug` and `relabel_num` set to 1.
pub fn relabel_debug_1(self, relabel_threshold: MetricLevel) -> RelabeledMetricVec<M> {
RelabeledMetricVec::with_metric_level_relabel_n(
MetricLevel::Debug,
self,
relabel_threshold,
1,
)
}
}

impl<T: MetricVecBuilder> RelabeledMetricVec<MetricVec<T>> {
pub fn with_label_values(&self, vals: &[&str]) -> T::M {
if self.metric_level > self.relabel_threshold {
Expand All @@ -89,8 +130,7 @@ impl<T: MetricVecBuilder> RelabeledMetricVec<MetricVec<T>> {
}

impl<T: MetricVecBuilder, const N: usize> RelabeledMetricVec<LabelGuardedMetricVec<T, N>> {
// TODO: shall we rename this to `with_guarded_label_values`?
pub fn with_label_values(&self, vals: &[&str; N]) -> LabelGuardedMetric<T::M, N> {
pub fn with_guarded_label_values(&self, vals: &[&str; N]) -> LabelGuardedMetric<T::M, N> {
if self.metric_level > self.relabel_threshold {
// relabel first n labels to empty string
let mut relabeled_vals = *vals;
Expand Down
22 changes: 11 additions & 11 deletions src/storage/src/monitor/local_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,30 +256,30 @@ impl LocalStoreMetrics {
pub fn new(metrics: &HummockStateStoreMetrics, table_id_label: &str) -> Self {
let cache_data_block_total = metrics
.sst_store_block_request_counts
.with_label_values(&[table_id_label, "data_total"])
.with_guarded_label_values(&[table_id_label, "data_total"])
.local();

let cache_data_block_miss = metrics
.sst_store_block_request_counts
.with_label_values(&[table_id_label, "data_miss"])
.with_guarded_label_values(&[table_id_label, "data_miss"])
.local();

let cache_meta_block_total = metrics
.sst_store_block_request_counts
.with_label_values(&[table_id_label, "meta_total"])
.with_guarded_label_values(&[table_id_label, "meta_total"])
.local();
let cache_data_prefetch_count = metrics
.sst_store_block_request_counts
.with_label_values(&[table_id_label, "prefetch_count"])
.with_guarded_label_values(&[table_id_label, "prefetch_count"])
.local();
let cache_data_prefetch_block_count = metrics
.sst_store_block_request_counts
.with_label_values(&[table_id_label, "prefetch_data_count"])
.with_guarded_label_values(&[table_id_label, "prefetch_data_count"])
.local();

let cache_meta_block_miss = metrics
.sst_store_block_request_counts
.with_label_values(&[table_id_label, "meta_miss"])
.with_guarded_label_values(&[table_id_label, "meta_miss"])
.local();

let remote_io_time = metrics
Expand All @@ -289,22 +289,22 @@ impl LocalStoreMetrics {

let processed_key_count = metrics
.iter_scan_key_counts
.with_label_values(&[table_id_label, "processed"])
.with_guarded_label_values(&[table_id_label, "processed"])
.local();

let skip_multi_version_key_count = metrics
.iter_scan_key_counts
.with_label_values(&[table_id_label, "skip_multi_version"])
.with_guarded_label_values(&[table_id_label, "skip_multi_version"])
.local();

let skip_delete_key_count = metrics
.iter_scan_key_counts
.with_label_values(&[table_id_label, "skip_delete"])
.with_guarded_label_values(&[table_id_label, "skip_delete"])
.local();

let total_key_count = metrics
.iter_scan_key_counts
.with_label_values(&[table_id_label, "total"])
.with_guarded_label_values(&[table_id_label, "total"])
.local();

let get_shared_buffer_hit_counts = metrics
Expand Down Expand Up @@ -477,7 +477,7 @@ macro_rules! define_bloom_filter_metrics {
pub fn new(metrics: &HummockStateStoreMetrics, table_id_label: &str, oper_type: &str) -> Self {
// checks SST bloom filters
Self {
$($x: metrics.$x.with_label_values(&[table_id_label, oper_type]).local(),)*
$($x: metrics.$x.with_guarded_label_values(&[table_id_label, oper_type]).local(),)*
}
}

Expand Down
24 changes: 15 additions & 9 deletions src/storage/src/monitor/monitored_storage_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,27 +287,27 @@ impl MonitoredStorageMetrics {
) -> LocalIterMetricsInner {
let iter_init_duration = self
.iter_init_duration
.with_label_values(&[table_label, iter_type])
.with_guarded_label_values(&[table_label, iter_type])
.local();
let iter_counts = self
.iter_counts
.with_label_values(&[table_label, iter_type])
.with_guarded_label_values(&[table_label, iter_type])
.local();
let iter_scan_duration = self
.iter_scan_duration
.with_label_values(&[table_label, iter_type])
.with_guarded_label_values(&[table_label, iter_type])
.local();
let iter_item = self
.iter_item
.with_label_values(&[table_label, iter_type])
.with_guarded_label_values(&[table_label, iter_type])
.local();
let iter_size = self
.iter_size
.with_label_values(&[table_label, iter_type])
.with_guarded_label_values(&[table_label, iter_type])
.local();
let iter_in_progress_counts = self
.iter_in_progress_counts
.with_label_values(&[table_label, iter_type]);
.with_guarded_label_values(&[table_label, iter_type]);

LocalIterMetricsInner {
iter_init_duration,
Expand Down Expand Up @@ -343,11 +343,17 @@ impl MonitoredStorageMetrics {
}

fn local_get_metrics(&self, table_label: &str) -> LocalGetMetrics {
let get_duration = self.get_duration.with_label_values(&[table_label]).local();
let get_key_size = self.get_key_size.with_label_values(&[table_label]).local();
let get_duration = self
.get_duration
.with_guarded_label_values(&[table_label])
.local();
let get_key_size = self
.get_key_size
.with_guarded_label_values(&[table_label])
.local();
let get_value_size = self
.get_value_size
.with_label_values(&[table_label])
.with_guarded_label_values(&[table_label])
.local();

LocalGetMetrics {
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/barrier_align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ pub async fn barrier_align(
) {
let actor_id = actor_id.to_string();
let fragment_id = fragment_id.to_string();
let left_barrier_align_duration = metrics.barrier_align_duration.with_label_values(&[
let left_barrier_align_duration = metrics.barrier_align_duration.with_guarded_label_values(&[
&actor_id,
&fragment_id,
"left",
executor_name,
]);
let right_barrier_align_duration = metrics.barrier_align_duration.with_label_values(&[
let right_barrier_align_duration = metrics.barrier_align_duration.with_guarded_label_values(&[
&actor_id,
&fragment_id,
"right",
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl DispatchExecutorMetrics {
actor_output_buffer_blocking_duration_ns: self
.metrics
.actor_output_buffer_blocking_duration_ns
.with_label_values(&[
.with_guarded_label_values(&[
&self.actor_id_str,
&self.fragment_id_str,
dispatcher.dispatcher_id_str(),
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
let join_matched_join_keys = ctx
.streaming_metrics
.join_matched_join_keys
.with_label_values(&[
.with_guarded_label_values(&[
&ctx.id.to_string(),
&ctx.fragment_id.to_string(),
&side_update.ht.table_id().to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl MergeExecutor {
Some(
self.metrics
.merge_barrier_align_duration
.with_label_values(&[
.with_guarded_label_values(&[
&self.actor_context.id.to_string(),
&self.actor_context.fragment_id.to_string(),
]),
Expand Down
41 changes: 10 additions & 31 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::config::MetricLevel;
use risingwave_common::metrics::{
LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec,
LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
Expand Down Expand Up @@ -288,14 +288,9 @@ impl StreamingMetrics {
&["actor_id", "fragment_id", "downstream_fragment_id"],
registry
)
.unwrap();
let actor_output_buffer_blocking_duration_ns =
RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
MetricLevel::Debug,
actor_output_buffer_blocking_duration_ns,
level,
1, // mask the first label `actor_id` if the level is less verbose than `Debug`
);
.unwrap()
// mask the first label `actor_id` if the level is less verbose than `Debug`
.relabel_debug_1(level);

let actor_input_buffer_blocking_duration_ns =
register_guarded_int_counter_vec_with_registry!(
Expand Down Expand Up @@ -436,14 +431,8 @@ impl StreamingMetrics {
&["actor_id", "fragment_id"],
registry
)
.unwrap();
let merge_barrier_align_duration =
RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
MetricLevel::Debug,
merge_barrier_align_duration,
level,
1,
);
.unwrap()
.relabel_debug_1(level);

let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
"stream_join_lookup_miss_count",
Expand Down Expand Up @@ -491,13 +480,8 @@ impl StreamingMetrics {
&["actor_id", "fragment_id", "wait_side", "executor"],
registry
)
.unwrap();
let barrier_align_duration = RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
MetricLevel::Debug,
barrier_align_duration,
level,
1,
);
.unwrap()
.relabel_debug_1(level);

let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
"stream_join_cached_entry_count",
Expand All @@ -518,13 +502,8 @@ impl StreamingMetrics {
&["actor_id", "fragment_id", "table_id"],
registry
)
.unwrap();
let join_matched_join_keys = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
MetricLevel::Debug,
join_matched_join_keys,
level,
1,
);
.unwrap()
.relabel_debug_1(level);

let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
"stream_agg_lookup_miss_count",
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async fn merge(
let mut watermark_buffers = BTreeMap::<usize, BufferedWatermarks<usize>>::new();

let mut start_time = Instant::now();
let barrier_align = metrics.barrier_align_duration.with_label_values(&[
let barrier_align = metrics.barrier_align_duration.with_guarded_label_values(&[
&actor_id.to_string(),
&fragment_id.to_string(),
"",
Expand Down

0 comments on commit 5055cc9

Please sign in to comment.