Skip to content

Commit

Permalink
fix: cherry pick fix on metric to release-1.5 (#14059) (#14189)
Browse files Browse the repository at this point in the history
Co-authored-by: Eric Fu <[email protected]>
  • Loading branch information
wenym1 and fuyufjh authored Dec 25, 2023
1 parent 24ae3bc commit af650d7
Show file tree
Hide file tree
Showing 21 changed files with 401 additions and 239 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -994,22 +994,22 @@ def section_streaming_actors(outer_panels):
),
],
),
panels.timeseries_row(
"Actor Input Rows",
panels.timeseries_rowsps(
"Actor Input Throughput (rows/s)",
"",
[
panels.target(
f"sum(rate({metric('stream_actor_in_record_cnt')}[$__rate_interval])) by (fragment_id)",
"fragment {{fragment_id}}",
f"sum(rate({metric('stream_actor_in_record_cnt')}[$__rate_interval])) by (fragment_id, upstream_fragment_id)",
"fragment {{fragment_id}}<-{{upstream_fragment_id}}",
),
panels.target_hidden(
f"rate({metric('stream_actor_in_record_cnt')}[$__rate_interval])",
"actor {{actor_id}}",
),
],
),
panels.timeseries_row(
"Actor Output Rows",
panels.timeseries_rowsps(
"Actor Output Throughput (rows/s)",
"",
[
panels.target(
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExec
metrics
.executor_metrics()
.exchange_recv_row_number
.with_label_values(&[
.with_guarded_label_values(&[
source_id.query_id.as_str(),
format!("{}", source_id.stage_id).as_str(),
format!("{}", source_id.task_id).as_str(),
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
metrics
.executor_metrics()
.row_seq_scan_next_duration
.with_label_values(&metrics.executor_labels(&identity))
.with_guarded_label_values(&metrics.executor_labels(&identity))
});

if ordered {
Expand Down
59 changes: 46 additions & 13 deletions src/batch/src/monitor/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use std::sync::{Arc, LazyLock};

use prometheus::{IntGauge, Registry};
use risingwave_common::metrics::{
LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounterVec,
LabelGuardedIntGaugeVec, TrAdderGauge,
LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounterVec,
LabelGuardedIntGauge, LabelGuardedIntGaugeVec, TrAdderGauge,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;

Expand Down Expand Up @@ -171,10 +171,18 @@ pub type BatchMetricsWithTaskLabels = Arc<BatchMetricsWithTaskLabelsInner>;
/// labels.
pub struct BatchMetricsWithTaskLabelsInner {
batch_manager_metrics: Arc<BatchManagerMetrics>,
task_metrics: Arc<BatchTaskMetrics>,
executor_metrics: Arc<BatchExecutorMetrics>,
task_id: TaskId,
task_labels: [String; 3],

// From BatchTaskMetrics
pub task_first_poll_delay: LabelGuardedGauge<3>,
pub task_fast_poll_duration: LabelGuardedGauge<3>,
pub task_idle_duration: LabelGuardedGauge<3>,
pub task_poll_duration: LabelGuardedGauge<3>,
pub task_scheduled_duration: LabelGuardedGauge<3>,
pub task_slow_poll_duration: LabelGuardedGauge<3>,
pub task_mem_usage: LabelGuardedIntGauge<3>,
}

impl BatchMetricsWithTaskLabelsInner {
Expand All @@ -184,27 +192,52 @@ impl BatchMetricsWithTaskLabelsInner {
executor_metrics: Arc<BatchExecutorMetrics>,
id: TaskId,
) -> Self {
let task_labels = [
id.query_id.clone(),
id.stage_id.to_string(),
id.task_id.to_string(),
];
let labels: &[&str; 3] = &task_labels.each_ref().map(|s| s.as_str());
let task_first_poll_delay = task_metrics
.task_first_poll_delay
.with_guarded_label_values(labels);
let task_fast_poll_duration = task_metrics
.task_fast_poll_duration
.with_guarded_label_values(labels);
let task_idle_duration = task_metrics
.task_idle_duration
.with_guarded_label_values(labels);
let task_poll_duration = task_metrics
.task_poll_duration
.with_guarded_label_values(labels);
let task_scheduled_duration = task_metrics
.task_scheduled_duration
.with_guarded_label_values(labels);
let task_slow_poll_duration = task_metrics
.task_slow_poll_duration
.with_guarded_label_values(labels);
let task_mem_usage = task_metrics
.task_mem_usage
.with_guarded_label_values(labels);
Self {
batch_manager_metrics,
task_metrics,
executor_metrics,
task_id: id.clone(),
task_labels: [id.query_id, id.stage_id.to_string(), id.task_id.to_string()],
task_labels,
task_first_poll_delay,
task_fast_poll_duration,
task_idle_duration,
task_poll_duration,
task_scheduled_duration,
task_slow_poll_duration,
task_mem_usage,
}
}

pub fn task_labels(&self) -> [&str; 3] {
self.task_labels.each_ref().map(String::as_str)
}

pub fn task_id(&self) -> TaskId {
self.task_id.clone()
}

pub fn get_task_metrics(&self) -> &Arc<BatchTaskMetrics> {
&self.task_metrics
}

pub fn executor_metrics(&self) -> &BatchExecutorMetrics {
&self.executor_metrics
}
Expand Down
7 changes: 2 additions & 5 deletions src/batch/src/task/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl BatchTaskContext for ComputeNodeContext {
let executor_mem_usage = metrics
.executor_metrics()
.mem_usage
.with_label_values(&metrics.executor_labels(executor_id));
.with_guarded_label_values(&metrics.executor_labels(executor_id));
MemoryContext::new(Some(self.mem_context.clone()), executor_mem_usage)
} else {
MemoryContext::none()
Expand Down Expand Up @@ -174,10 +174,7 @@ impl ComputeNodeContext {
));
let mem_context = MemoryContext::new(
Some(batch_mem_context),
batch_metrics
.get_task_metrics()
.task_mem_usage
.with_label_values(&batch_metrics.task_labels()),
batch_metrics.task_mem_usage.clone(),
);
Self {
env,
Expand Down
20 changes: 6 additions & 14 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,31 +485,23 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
error!("Batch task {:?} panic: {:?}", task_id, error);
}
let cumulative = monitor.cumulative();
let labels = &batch_metrics.task_labels();
let task_metrics = batch_metrics.get_task_metrics();
task_metrics
batch_metrics
.task_first_poll_delay
.with_label_values(labels)
.set(cumulative.total_first_poll_delay.as_secs_f64());
task_metrics
batch_metrics
.task_fast_poll_duration
.with_label_values(labels)
.set(cumulative.total_fast_poll_duration.as_secs_f64());
task_metrics
batch_metrics
.task_idle_duration
.with_label_values(labels)
.set(cumulative.total_idle_duration.as_secs_f64());
task_metrics
batch_metrics
.task_poll_duration
.with_label_values(labels)
.set(cumulative.total_poll_duration.as_secs_f64());
task_metrics
batch_metrics
.task_scheduled_duration
.with_label_values(labels)
.set(cumulative.total_scheduled_duration.as_secs_f64());
task_metrics
batch_metrics
.task_slow_poll_duration
.with_label_values(labels)
.set(cumulative.total_slow_poll_duration.as_secs_f64());
} else if let Err(error) = AssertUnwindSafe(task(task_id.clone()))
.rw_catch_unwind()
Expand Down
20 changes: 15 additions & 5 deletions src/common/src/metrics/guarded_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,17 @@ impl<T: MetricVecBuilder, const N: usize> LabelGuardedMetricVec<T, N> {
}
}

pub fn with_label_values(&self, labels: &[&str; N]) -> LabelGuardedMetric<T::M, N> {
/// This is similar to the `with_label_values` of the raw metrics vec.
/// We need to pay special attention that, unless for some special purpose,
/// we should not drop the returned `LabelGuardedMetric` immediately after
/// using it, such as `metrics.with_guarded_label_values(...).inc();`,
/// because after dropped the label will be regarded as not used any more,
/// and the internal raw metrics will be removed and reset.
///
/// Instead, we should store the returned `LabelGuardedMetric` in a scope with longer
/// lifetime so that the labels can be regarded as being used in its whole life scope.
/// This is also the recommended way to use the raw metrics vec.
pub fn with_guarded_label_values(&self, labels: &[&str; N]) -> LabelGuardedMetric<T::M, N> {
let guard = LabelGuardedMetricsInfo::register_new_label(&self.info, labels);
let inner = self.inner.with_label_values(labels);
LabelGuardedMetric {
Expand All @@ -219,7 +229,7 @@ impl<T: MetricVecBuilder, const N: usize> LabelGuardedMetricVec<T, N> {

pub fn with_test_label(&self) -> LabelGuardedMetric<T::M, N> {
let labels: [&'static str; N] = gen_test_label::<N>();
self.with_label_values(&labels)
self.with_guarded_label_values(&labels)
}
}

Expand Down Expand Up @@ -376,12 +386,12 @@ mod tests {
#[test]
fn test_label_guarded_metrics_drop() {
let vec = LabelGuardedIntCounterVec::<3>::test_int_counter_vec();
let m1_1 = vec.with_label_values(&["1", "2", "3"]);
let m1_1 = vec.with_guarded_label_values(&["1", "2", "3"]);
assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
let m1_2 = vec.with_label_values(&["1", "2", "3"]);
let m1_2 = vec.with_guarded_label_values(&["1", "2", "3"]);
let m1_3 = m1_2.clone();
assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
let m2 = vec.with_label_values(&["2", "2", "3"]);
let m2 = vec.with_guarded_label_values(&["2", "2", "3"]);
assert_eq!(2, vec.collect().pop().unwrap().get_metric().len());
drop(m1_3);
assert_eq!(2, vec.collect().pop().unwrap().get_metric().len());
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/metrics/relabeled_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ impl<T: MetricVecBuilder, const N: usize> RelabeledMetricVec<LabelGuardedMetricV
for label in relabeled_vals.iter_mut().take(self.relabel_num) {
*label = "";
}
return self.metric.with_label_values(&relabeled_vals);
return self.metric.with_guarded_label_values(&relabeled_vals);
}
self.metric.with_label_values(vals)
self.metric.with_guarded_label_values(vals)
}
}

Expand Down
Loading

0 comments on commit af650d7

Please sign in to comment.