Skip to content

Commit

Permalink
refactor(metrics): use counter instead of histogram for barrier align…
Browse files Browse the repository at this point in the history
… duration (#17594)
  • Loading branch information
lmatz authored Jul 8, 2024
1 parent 99fcd9a commit 0e873a0
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 28 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

20 changes: 8 additions & 12 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1232,21 +1232,17 @@ def section_streaming_actors(outer_panels):
),
],
),
panels.timeseries_actor_latency(
"Executor Barrier Align",
panels.timeseries_percentage(
"Executor Barrier Align Per Second",
"",
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('stream_barrier_align_duration_bucket')}[$__rate_interval])) by (le, executor, fragment_id, wait_side, {COMPONENT_LABEL}))",
f"p{legend} - executor {{{{executor}}}} fragment {{{{fragment_id}}}} {{{{wait_side}}}} - {{{{{COMPONENT_LABEL}}}}}",
),
[90, 99, 999, "max"],
),
panels.target(
f"sum by(le, executor, fragment_id, wait_side, job)(rate({metric('stream_barrier_align_duration_sum')}[$__rate_interval])) / sum by(le,executor,fragment_id,wait_side,{COMPONENT_LABEL}) (rate({metric('stream_barrier_align_duration_count')}[$__rate_interval])) > 0",
"avg - executor {{executor}} fragment {{fragment_id}} {{wait_side}} - {{%s}}"
% COMPONENT_LABEL,
f"avg(rate({metric('stream_barrier_align_duration_ns')}[$__rate_interval]) / 1000000000) by (fragment_id,wait_side, executor)",
"fragment {{fragment_id}} {{wait_side}} {{executor}}",
),
panels.target_hidden(
f"rate({metric('stream_barrier_align_duration_ns')}[$__rate_interval]) / 1000000000",
"actor {{actor_id}} fragment {{fragment_id}} {{wait_side}} {{executor}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions src/stream/src/executor/barrier_align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub async fn barrier_align(
Message::Barrier(barrier) => {
yield AlignedMessage::Barrier(barrier);
right_barrier_align_duration
.observe(start_time.elapsed().as_secs_f64());
.inc_by(start_time.elapsed().as_nanos() as u64);
break;
}
}
Expand All @@ -144,7 +144,8 @@ pub async fn barrier_align(
Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
Message::Barrier(barrier) => {
yield AlignedMessage::Barrier(barrier);
left_barrier_align_duration.observe(start_time.elapsed().as_secs_f64());
left_barrier_align_duration
.inc_by(start_time.elapsed().as_nanos() as u64);
break;
}
}
Expand Down
14 changes: 5 additions & 9 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::config::MetricLevel;
use risingwave_common::metrics::{
LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec,
RelabeledGuardedHistogramVec,
RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common::{
Expand Down Expand Up @@ -96,7 +96,7 @@ pub struct StreamingMetrics {
pub join_matched_join_keys: RelabeledGuardedHistogramVec<3>,

// Streaming Join, Streaming Dynamic Filter and Streaming Union
pub barrier_align_duration: RelabeledGuardedHistogramVec<4>,
pub barrier_align_duration: RelabeledGuardedIntCounterVec<4>,

// Streaming Aggregation
agg_lookup_miss_count: LabelGuardedIntCounterVec<3>,
Expand Down Expand Up @@ -465,19 +465,15 @@ impl StreamingMetrics {
)
.unwrap();

let opts = histogram_opts!(
"stream_barrier_align_duration",
let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
"stream_barrier_align_duration_ns",
"Duration of join align barrier",
exponential_buckets(0.0001, 2.0, 21).unwrap() // max 104s
);
let barrier_align_duration = register_guarded_histogram_vec_with_registry!(
opts,
&["actor_id", "fragment_id", "wait_side", "executor"],
registry
)
.unwrap();

let barrier_align_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
let barrier_align_duration = RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
MetricLevel::Debug,
barrier_align_duration,
level,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ async fn merge(
None => {
assert!(active.is_terminated());
let barrier = current_barrier.take().unwrap();
barrier_align.observe(start_time.elapsed().as_secs_f64());
barrier_align.inc_by(start_time.elapsed().as_nanos() as u64);

let upstreams = std::mem::take(&mut blocked);
active.extend(upstreams.into_iter().map(|upstream| upstream.into_future()));
Expand Down

0 comments on commit 0e873a0

Please sign in to comment.