Skip to content

Commit

Permalink
feat(metrics): add Barrier pending time metrics (cherrypick #15938) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Mar 29, 2024
1 parent 020a36c commit 8ee52d8
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 34 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

70 changes: 40 additions & 30 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,46 @@ def section_streaming(outer_panels):
outer_panels.row_collapsed(
"Streaming",
[
panels.timeseries_count(
"Barrier Number",
"The number of barriers that have been ingested but not completely processed. This metric reflects the "
"current level of congestion within the system.",
[
panels.target(f"{metric('all_barrier_nums')}", "all_barrier"),
panels.target(
f"{metric('in_flight_barrier_nums')}", "in_flight_barrier"
),
],
),
panels.timeseries_latency(
"Barrier Latency",
"The time that the data between two consecutive barriers gets fully processed, i.e. the computation "
"results are made durable into materialized views or sink to external systems. This metric shows to users "
"the freshness of materialized views.",
quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('meta_barrier_duration_seconds_bucket')}[$__rate_interval])) by (le))",
f"barrier_latency_p{legend}",
),
[50, 90, 99, 999, "max"],
)
+ [
panels.target(
f"rate({metric('meta_barrier_duration_seconds_sum')}[$__rate_interval]) / rate({metric('meta_barrier_duration_seconds_count')}[$__rate_interval])",
"barrier_latency_avg",
),
],
),
panels.timeseries(
"Barrier pending time (secs)",
"The duration from the last committed barrier's epoch time to the current time. This metric reflects the "
"data freshness of the system. During this time, no new data has been committed.",
[
panels.target(
f"timestamp({metric('last_committed_barrier_time')}) - {metric('last_committed_barrier_time')}", "barrier_pending_time"
)
],
),
panels.timeseries_rowsps(
"Source Throughput(rows/s)",
"The figure shows the number of rows read by each source per second.",
Expand Down Expand Up @@ -824,17 +864,6 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_count(
"Barrier Number",
"The number of barriers that have been ingested but not completely processed. This metric reflects the "
"current level of congestion within the system.",
[
panels.target(f"{metric('all_barrier_nums')}", "all_barrier"),
panels.target(
f"{metric('in_flight_barrier_nums')}", "in_flight_barrier"
),
],
),
panels.timeseries_latency(
"Barrier Send Latency",
"The duration between the time point when the scheduled barrier needs to be sent and the time point when "
Expand All @@ -854,25 +883,6 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_latency(
"Barrier Latency",
"The time that the data between two consecutive barriers gets fully processed, i.e. the computation "
"results are made durable into materialized views or sink to external systems. This metric shows to users "
"the freshness of materialized views.",
quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('meta_barrier_duration_seconds_bucket')}[$__rate_interval])) by (le))",
f"barrier_latency_p{legend}",
),
[50, 90, 99, 999, "max"],
)
+ [
panels.target(
f"rate({metric('meta_barrier_duration_seconds_sum')}[$__rate_interval]) / rate({metric('meta_barrier_duration_seconds_count')}[$__rate_interval])",
"barrier_latency_avg",
),
],
),
panels.timeseries_latency(
"Barrier In-Flight Latency",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions src/common/src/util/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl Epoch {
next_epoch
}

/// milliseconds since the RisingWave epoch
pub fn physical_time(&self) -> u64 {
self.0 >> EPOCH_PHYSICAL_SHIFT_BITS
}
Expand All @@ -87,6 +88,10 @@ impl Epoch {
UNIX_RISINGWAVE_DATE_SEC * 1000 + self.physical_time()
}

pub fn as_unix_secs(&self) -> u64 {
UNIX_RISINGWAVE_DATE_SEC + self.physical_time() / 1000
}

/// Returns the epoch in a Timestamptz.
pub fn as_timestamptz(&self) -> Timestamptz {
Timestamptz::from_millis(self.as_unix_millis() as i64).expect("epoch is out of range")
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,9 @@ impl GlobalBarrierManagerContext {
let duration_sec = enqueue_time.stop_and_record();
self.report_complete_event(duration_sec, &command_ctx);
wait_commit_timer.observe_duration();
self.metrics
.last_committed_barrier_time
.set(command_ctx.curr_epoch.value().as_unix_secs() as i64);
Ok(BarrierCompleteOutput {
command_ctx,
require_next_checkpoint: has_remaining,
Expand Down
9 changes: 9 additions & 0 deletions src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub struct MetaMetrics {
pub all_barrier_nums: IntGauge,
/// The number of in-flight barriers
pub in_flight_barrier_nums: IntGauge,
/// The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.
pub last_committed_barrier_time: IntGauge,

/// ********************************** Recovery ************************************
pub recovery_failure_cnt: IntCounter,
Expand Down Expand Up @@ -225,6 +227,12 @@ impl MetaMetrics {
registry
)
.unwrap();
let last_committed_barrier_time = register_int_gauge_with_registry!(
"last_committed_barrier_time",
"The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
registry
)
.unwrap();

let max_committed_epoch = register_int_gauge_with_registry!(
"storage_max_committed_epoch",
Expand Down Expand Up @@ -626,6 +634,7 @@ impl MetaMetrics {
barrier_send_latency,
all_barrier_nums,
in_flight_barrier_nums,
last_committed_barrier_time,
recovery_failure_cnt,
recovery_latency,

Expand Down

0 comments on commit 8ee52d8

Please sign in to comment.