Skip to content

Commit

Permalink
feat(metrics): add internal latency of actors, MVs and sinks (#19639)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Dec 3, 2024
1 parent c3750eb commit a41866f
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 27 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion grafana/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ def timeseries_latency_small(self, title, description, targets):
**self.common_options,
)

def timeseries_id(self, title, description, targets):
def timeseries_epoch(self, title, description, targets):
gridPos = self.layout.next_half_width_graph()
return TimeSeries(
title=title,
Expand Down Expand Up @@ -527,3 +527,8 @@ def quantile(f, percentiles):
return list(
map(lambda p: f(quantile_map[str(p)][0], quantile_map[str(p)][1]), percentiles)
)


def epoch_to_unix_millis(epoch_expr):
# UNIX_RISINGWAVE_DATE_SEC
return f"(1617235200000+({epoch_expr} != 0)/65536)"
30 changes: 27 additions & 3 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,18 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_epoch(
"Current Epoch of Materialize Views",
"The current epoch that the Materialize Executors are processing. If an MV's epoch is far behind the others, "
"it's very likely to be the performance bottleneck",
[
panels.target(
# Here we use `min` but actually no much difference. Any of the sampled `current_epoch` makes sense.
f"min({metric('stream_mview_current_epoch')} != 0) by (table_id) * on(table_id) group_left(table_name) group({metric('table_info')}) by (table_id, table_name)",
"{{table_id}} {{table_name}}",
),
]
),
panels.timeseries_latency(
"Snapshot Backfill Lag",
"",
Expand Down Expand Up @@ -1718,6 +1730,18 @@ def section_streaming_actors(outer_panels: Panels):
),
],
),
panels.timeseries_epoch(
"Current Epoch of Actors",
"The current epoch that the actors are processing. If an actor's epoch is far behind the others, "
"it's very likely to be the performance bottleneck",
[
panels.target(
# Here we use `min` but actually no much difference. Any of the sampled epoches makes sense.
f"min({metric('stream_actor_current_epoch')} != 0) by (fragment_id)",
"fragment {{fragment_id}}",
),
]
),
],
)
]
Expand Down Expand Up @@ -3253,7 +3277,7 @@ def section_hummock_manager(outer_panels):
),
],
),
panels.timeseries_id(
panels.timeseries_epoch(
"Version Id",
"",
[
Expand All @@ -3275,7 +3299,7 @@ def section_hummock_manager(outer_panels):
),
],
),
panels.timeseries_id(
panels.timeseries_epoch(
"Epoch",
"",
[
Expand Down Expand Up @@ -4338,7 +4362,7 @@ def section_sink_metrics(outer_panels):
),
],
),
panels.timeseries_id(
panels.timeseries_epoch(
"Log Store Read/Write Epoch",
"",
[
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,26 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_latency(
"Latency of Materialize Views & Sinks",
"The current epoch that the Materialize Executors or Sink Executor are processing. If an MV/Sink's epoch is far behind the others, "
"it's very likely to be the performance bottleneck",
[
panels.target(
# Here we use `min` but actually no much difference. Any of the sampled `current_epoch` makes sense.
f"max(timestamp({metric('stream_mview_current_epoch')}) - {epoch_to_unix_millis(metric('stream_mview_current_epoch'))}/1000) by (table_id) * on(table_id) group_left(table_name) group({metric('table_info')}) by (table_id, table_name)",
"{{table_id}} {{table_name}}",
),
panels.target(
f"max(timestamp({metric('log_store_latest_read_epoch')}) - {epoch_to_unix_millis(metric('log_store_latest_read_epoch'))}/1000) by (sink_id, sink_name)",
"{{sink_id}} {{sink_name}} (output)",
),
panels.target(
f"max(timestamp({metric('log_store_latest_write_epoch')}) - {epoch_to_unix_millis(metric('log_store_latest_write_epoch'))}/1000) by (sink_id, sink_name)",
"{{sink_id}} {{sink_name}} (enqueue)",
),
]
),
],
)
]
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

19 changes: 16 additions & 3 deletions src/common/metrics/src/relabeled_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,25 @@ use crate::{
};

/// For all `Relabeled*Vec` below,
/// - when `metric_level` <= `relabel_threshold`, they behaves exactly the same as their inner
/// - when `metric_level` <= `relabel_threshold`, they behave exactly the same as their inner
/// metric.
/// - when `metric_level` > `relabel_threshold`, all their input label values are rewrite to "" when
/// - when `metric_level` > `relabel_threshold`, the first `relabel_num` labels are rewrite to "" when
/// calling `with_label_values`. That's means the metric vec is aggregated into a single metric.
///
///
/// These wrapper classes add a `metric_level` field to corresponding metric.
/// We could have use one single struct to represent all `MetricVec<T: MetricVecBuilder>`, rather
/// than specializing them one by one. However, that's undoable because prometheus crate doesn't
/// export `MetricVecBuilder` implementation like `HistogramVecBuilder`.
///
/// ## Note
///
/// CAUTION! Relabelling might cause expected result!
///
/// For counters (including histogram because it uses counters internally), it's usually natural
/// to sum up the count from multiple labels.
///
/// For the rest (such as Gauge), the semantics becomes "any/last of the recorded value". Please be
/// cautious.
#[derive(Clone, Debug)]
pub struct RelabeledMetricVec<M> {
relabel_threshold: MetricLevel,
Expand Down Expand Up @@ -160,5 +169,9 @@ pub type RelabeledGuardedHistogramVec<const N: usize> =
RelabeledMetricVec<LabelGuardedHistogramVec<N>>;
pub type RelabeledGuardedIntCounterVec<const N: usize> =
RelabeledMetricVec<LabelGuardedIntCounterVec<N>>;

/// CAUTION! Relabelling a Gauge might cause expected result!
///
/// See [`RelabeledMetricVec`] for details.
pub type RelabeledGuardedIntGaugeVec<const N: usize> =
RelabeledMetricVec<LabelGuardedIntGaugeVec<N>>;
2 changes: 1 addition & 1 deletion src/risedevtool/connector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ script = '''
set -euo pipefail
if !(command -v javac &> /dev/null && [[ "$(javac -version 2>&1 | awk '{print $2}')" =~ ^(11|17|19|20|21|22) ]]); then
if !(command -v javac &> /dev/null && [[ "$(javac -version 2>&1 | awk '{print $2}' | cut -d. -f1)" -ge 11 ]]); then
echo "JDK 11+ is not installed. Please install JDK 11+ first."
exit 1
fi
Expand Down
15 changes: 12 additions & 3 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ pub struct Actor<C> {
/// The subtasks to execute concurrently.
subtasks: Vec<SubtaskHandle>,

_metrics: Arc<StreamingMetrics>,
pub actor_context: ActorContextRef,
expr_context: ExprContext,
barrier_manager: LocalBarrierManager,
Expand All @@ -169,15 +168,14 @@ where
pub fn new(
consumer: C,
subtasks: Vec<SubtaskHandle>,
metrics: Arc<StreamingMetrics>,
_metrics: Arc<StreamingMetrics>,
actor_context: ActorContextRef,
expr_context: ExprContext,
barrier_manager: LocalBarrierManager,
) -> Self {
Self {
consumer,
subtasks,
_metrics: metrics,
actor_context,
expr_context,
barrier_manager,
Expand Down Expand Up @@ -236,6 +234,15 @@ where
.with_guarded_label_values(&[&self.actor_context.fragment_id.to_string()]);
let _actor_count_guard = actor_count.inc_guard();

let current_epoch = self
.actor_context
.streaming_metrics
.actor_current_epoch
.with_guarded_label_values(&[
&self.actor_context.id.to_string(),
&self.actor_context.fragment_id.to_string(),
]);

let mut last_epoch: Option<EpochPair> = None;
let mut stream = Box::pin(Box::new(self.consumer).execute());

Expand Down Expand Up @@ -265,6 +272,8 @@ where
break Ok(barrier);
}

current_epoch.set(barrier.epoch.curr as i64);

// Collect barriers to local barrier manager
self.barrier_manager.collect(id, &barrier);

Expand Down
38 changes: 26 additions & 12 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ pub struct StreamingMetrics {

// Streaming actor
pub actor_count: LabelGuardedIntGaugeVec<1>,
#[expect(dead_code)]
actor_memory_usage: LabelGuardedIntGaugeVec<2>,
actor_in_record_cnt: RelabeledGuardedIntCounterVec<3>,
pub actor_out_record_cnt: RelabeledGuardedIntCounterVec<2>,
pub actor_current_epoch: RelabeledGuardedIntGaugeVec<2>,

// Source
pub source_output_row_count: LabelGuardedIntCounterVec<4>,
Expand Down Expand Up @@ -191,6 +190,7 @@ pub struct StreamingMetrics {
materialize_cache_hit_count: RelabeledGuardedIntCounterVec<3>,
materialize_cache_total_count: RelabeledGuardedIntCounterVec<3>,
materialize_input_row_count: RelabeledGuardedIntCounterVec<3>,
materialize_current_epoch: RelabeledGuardedIntGaugeVec<3>,
}

pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
Expand Down Expand Up @@ -253,6 +253,15 @@ impl StreamingMetrics {
.unwrap()
.relabel_debug_1(level);

let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
"stream_mview_current_epoch",
"The current epoch of the materialized executor",
&["actor_id", "table_id", "fragment_id"],
registry
)
.unwrap()
.relabel_debug_1(level);

let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
"stream_sink_chunk_buffer_size",
"Total size of chunks buffered in a barrier",
Expand Down Expand Up @@ -395,6 +404,15 @@ impl StreamingMetrics {
.unwrap()
.relabel_debug_1(level);

let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
"stream_actor_current_epoch",
"Current epoch of actor",
&["actor_id", "fragment_id"],
registry
)
.unwrap()
.relabel_debug_1(level);

let actor_count = register_guarded_int_gauge_vec_with_registry!(
"stream_actor_count",
"Total number of actors (parallelism)",
Expand All @@ -403,15 +421,6 @@ impl StreamingMetrics {
)
.unwrap();

// dead code
let actor_memory_usage = register_guarded_int_gauge_vec_with_registry!(
"actor_memory_usage",
"Memory usage (bytes)",
&["actor_id", "fragment_id"],
registry,
)
.unwrap();

let opts = histogram_opts!(
"stream_merge_barrier_align_duration",
"Duration of merge align barrier",
Expand Down Expand Up @@ -1049,9 +1058,9 @@ impl StreamingMetrics {
actor_idle_duration,
actor_idle_cnt,
actor_count,
actor_memory_usage,
actor_in_record_cnt,
actor_out_record_cnt,
actor_current_epoch,
source_output_row_count,
source_split_change_count,
source_backfill_row_count,
Expand Down Expand Up @@ -1135,6 +1144,7 @@ impl StreamingMetrics {
materialize_cache_hit_count,
materialize_cache_total_count,
materialize_input_row_count,
materialize_current_epoch,
}
}

Expand Down Expand Up @@ -1494,6 +1504,9 @@ impl StreamingMetrics {
materialize_input_row_count: self
.materialize_input_row_count
.with_guarded_label_values(label_list),
materialize_current_epoch: self
.materialize_current_epoch
.with_guarded_label_values(label_list),
}
}
}
Expand Down Expand Up @@ -1527,6 +1540,7 @@ pub struct MaterializeMetrics {
pub materialize_cache_hit_count: LabelGuardedIntCounter<3>,
pub materialize_cache_total_count: LabelGuardedIntCounter<3>,
pub materialize_input_row_count: LabelGuardedIntCounter<3>,
pub materialize_current_epoch: LabelGuardedIntGauge<3>,
}

pub struct GroupTopNMetrics {
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
self.materialize_cache.data.clear();
}
}

self.metrics
.materialize_current_epoch
.set(b.epoch.curr as i64);

Message::Barrier(b)
}
}
Expand Down

0 comments on commit a41866f

Please sign in to comment.