Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): add internal latency of actors, MVs and sinks #19639

Merged
merged 8 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion grafana/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ def timeseries_latency_small(self, title, description, targets):
**self.common_options,
)

def timeseries_id(self, title, description, targets):
def timeseries_epoch(self, title, description, targets):
gridPos = self.layout.next_half_width_graph()
return TimeSeries(
title=title,
Expand Down Expand Up @@ -527,3 +527,8 @@ def quantile(f, percentiles):
return list(
map(lambda p: f(quantile_map[str(p)][0], quantile_map[str(p)][1]), percentiles)
)


def epoch_to_unix_millis(epoch_expr):
# UNIX_RISINGWAVE_DATE_SEC
return f"(1617235200000+({epoch_expr} != 0)/65536)"
30 changes: 27 additions & 3 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,18 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_epoch(
"Current Epoch of Materialize Views",
"The current epoch that the Materialize Executors are processing. If an MV's epoch is far behind the others, "
"it's very likely to be the performance bottleneck",
Comment on lines +1135 to +1136
Copy link
Member

@xxchan xxchan Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, if an upstream MV is blocked, then all downstream MVs will have epoch lag? (Therefore we can't locate the root cause from this metrics alone) 🤔

Besides, backpressure may also affect upstream MV's epoch...?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found you once raised the idea to show the epoch on DAG. 🤔

#13481 (comment)

[
panels.target(
# Here we use `min` but actually no much difference. Any of the sampled `current_epoch` makes sense.
f"min({metric('stream_mview_current_epoch')} != 0) by (table_id) * on(table_id) group_left(table_name) group({metric('table_info')}) by (table_id, table_name)",
"{{table_id}} {{table_name}}",
),
]
),
panels.timeseries_latency(
"Snapshot Backfill Lag",
"",
Expand Down Expand Up @@ -1718,6 +1730,18 @@ def section_streaming_actors(outer_panels: Panels):
),
],
),
panels.timeseries_epoch(
"Current Epoch of Actors",
"The current epoch that the actors are processing. If an actor's epoch is far behind the others, "
"it's very likely to be the performance bottleneck",
[
panels.target(
# Here we use `min` but actually no much difference. Any of the sampled epoches makes sense.
f"min({metric('stream_actor_current_epoch')} != 0) by (fragment_id)",
"fragment {{fragment_id}}",
),
]
),
],
)
]
Expand Down Expand Up @@ -3253,7 +3277,7 @@ def section_hummock_manager(outer_panels):
),
],
),
panels.timeseries_id(
panels.timeseries_epoch(
"Version Id",
"",
[
Expand All @@ -3275,7 +3299,7 @@ def section_hummock_manager(outer_panels):
),
],
),
panels.timeseries_id(
panels.timeseries_epoch(
"Epoch",
"",
[
Expand Down Expand Up @@ -4338,7 +4362,7 @@ def section_sink_metrics(outer_panels):
),
],
),
panels.timeseries_id(
panels.timeseries_epoch(
"Log Store Read/Write Epoch",
"",
[
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,26 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_latency(
"Latency of Materialize Views & Sinks",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be easier to interpret than the epoch one. Should we add it to dev dashboard also? (Actually I've never checked user dashboard before... 🤪)

Copy link
Member Author

@fuyufjh fuyufjh Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dev dashboard shows epoch instead of latency. The latency is actually calculated by timestamp(<epoch_metrics>) - <epoch_metrics>, which is not very accurate. Thus I think, with a deeper understanding of RisingWave, showing epoch would be better.

"The current epoch that the Materialize Executors or Sink Executor are processing. If an MV/Sink's epoch is far behind the others, "
"it's very likely to be the performance bottleneck",
[
panels.target(
# Here we use `min` but actually no much difference. Any of the sampled `current_epoch` makes sense.
f"max(timestamp({metric('stream_mview_current_epoch')}) - {epoch_to_unix_millis(metric('stream_mview_current_epoch'))}/1000) by (table_id) * on(table_id) group_left(table_name) group({metric('table_info')}) by (table_id, table_name)",
"{{table_id}} {{table_name}}",
),
panels.target(
f"max(timestamp({metric('log_store_latest_read_epoch')}) - {epoch_to_unix_millis(metric('log_store_latest_read_epoch'))}/1000) by (sink_id, sink_name)",
"{{sink_id}} {{sink_name}} (output)",
),
panels.target(
f"max(timestamp({metric('log_store_latest_write_epoch')}) - {epoch_to_unix_millis(metric('log_store_latest_write_epoch'))}/1000) by (sink_id, sink_name)",
"{{sink_id}} {{sink_name}} (enqueue)",
),
]
),
],
)
]
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

19 changes: 16 additions & 3 deletions src/common/metrics/src/relabeled_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,25 @@ use crate::{
};

/// For all `Relabeled*Vec` below,
/// - when `metric_level` <= `relabel_threshold`, they behaves exactly the same as their inner
/// - when `metric_level` <= `relabel_threshold`, they behave exactly the same as their inner
/// metric.
/// - when `metric_level` > `relabel_threshold`, all their input label values are rewrite to "" when
/// - when `metric_level` > `relabel_threshold`, the first `relabel_num` labels are rewrite to "" when
/// calling `with_label_values`. That's means the metric vec is aggregated into a single metric.
///
///
/// These wrapper classes add a `metric_level` field to corresponding metric.
/// We could have use one single struct to represent all `MetricVec<T: MetricVecBuilder>`, rather
/// than specializing them one by one. However, that's undoable because prometheus crate doesn't
/// export `MetricVecBuilder` implementation like `HistogramVecBuilder`.
///
/// ## Note
///
/// CAUTION! Relabelling might cause expected result!
///
/// For counters (including histogram because it uses counters internally), it's usually natural
/// to sum up the count from multiple labels.
///
/// For the rest (such as Gauge), the semantics becomes "any/last of the recorded value". Please be
/// cautious.
#[derive(Clone, Debug)]
pub struct RelabeledMetricVec<M> {
relabel_threshold: MetricLevel,
Expand Down Expand Up @@ -160,5 +169,9 @@ pub type RelabeledGuardedHistogramVec<const N: usize> =
RelabeledMetricVec<LabelGuardedHistogramVec<N>>;
pub type RelabeledGuardedIntCounterVec<const N: usize> =
RelabeledMetricVec<LabelGuardedIntCounterVec<N>>;

/// CAUTION! Relabelling a Gauge might cause expected result!
///
/// See [`RelabeledMetricVec`] for details.
pub type RelabeledGuardedIntGaugeVec<const N: usize> =
RelabeledMetricVec<LabelGuardedIntGaugeVec<N>>;
2 changes: 1 addition & 1 deletion src/risedevtool/connector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ script = '''

set -euo pipefail

if !(command -v javac &> /dev/null && [[ "$(javac -version 2>&1 | awk '{print $2}')" =~ ^(11|17|19|20|21|22) ]]); then
if !(command -v javac &> /dev/null && [[ "$(javac -version 2>&1 | awk '{print $2}' | cut -d. -f1)" -ge 11 ]]); then
echo "JDK 11+ is not installed. Please install JDK 11+ first."
exit 1
fi
Expand Down
15 changes: 12 additions & 3 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ pub struct Actor<C> {
/// The subtasks to execute concurrently.
subtasks: Vec<SubtaskHandle>,

_metrics: Arc<StreamingMetrics>,
pub actor_context: ActorContextRef,
expr_context: ExprContext,
barrier_manager: LocalBarrierManager,
Expand All @@ -169,15 +168,14 @@ where
pub fn new(
consumer: C,
subtasks: Vec<SubtaskHandle>,
metrics: Arc<StreamingMetrics>,
_metrics: Arc<StreamingMetrics>,
actor_context: ActorContextRef,
expr_context: ExprContext,
barrier_manager: LocalBarrierManager,
) -> Self {
Self {
consumer,
subtasks,
_metrics: metrics,
actor_context,
expr_context,
barrier_manager,
Expand Down Expand Up @@ -236,6 +234,15 @@ where
.with_guarded_label_values(&[&self.actor_context.fragment_id.to_string()]);
let _actor_count_guard = actor_count.inc_guard();

let current_epoch = self
.actor_context
.streaming_metrics
.actor_current_epoch
.with_guarded_label_values(&[
&self.actor_context.id.to_string(),
&self.actor_context.fragment_id.to_string(),
]);

let mut last_epoch: Option<EpochPair> = None;
let mut stream = Box::pin(Box::new(self.consumer).execute());

Expand Down Expand Up @@ -265,6 +272,8 @@ where
break Ok(barrier);
}

current_epoch.set(barrier.epoch.curr as i64);

// Collect barriers to local barrier manager
self.barrier_manager.collect(id, &barrier);

Expand Down
38 changes: 26 additions & 12 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ pub struct StreamingMetrics {

// Streaming actor
pub actor_count: LabelGuardedIntGaugeVec<1>,
#[expect(dead_code)]
actor_memory_usage: LabelGuardedIntGaugeVec<2>,
actor_in_record_cnt: RelabeledGuardedIntCounterVec<3>,
pub actor_out_record_cnt: RelabeledGuardedIntCounterVec<2>,
pub actor_current_epoch: RelabeledGuardedIntGaugeVec<2>,

// Source
pub source_output_row_count: LabelGuardedIntCounterVec<4>,
Expand Down Expand Up @@ -191,6 +190,7 @@ pub struct StreamingMetrics {
materialize_cache_hit_count: RelabeledGuardedIntCounterVec<3>,
materialize_cache_total_count: RelabeledGuardedIntCounterVec<3>,
materialize_input_row_count: RelabeledGuardedIntCounterVec<3>,
materialize_current_epoch: RelabeledGuardedIntGaugeVec<3>,
}

pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
Expand Down Expand Up @@ -253,6 +253,15 @@ impl StreamingMetrics {
.unwrap()
.relabel_debug_1(level);

let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
"stream_mview_current_epoch",
"The current epoch of the materialized executor",
&["actor_id", "table_id", "fragment_id"],
registry
)
.unwrap()
.relabel_debug_1(level);

let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
"stream_sink_chunk_buffer_size",
"Total size of chunks buffered in a barrier",
Expand Down Expand Up @@ -395,6 +404,15 @@ impl StreamingMetrics {
.unwrap()
.relabel_debug_1(level);

let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
"stream_actor_current_epoch",
"Current epoch of actor",
&["actor_id", "fragment_id"],
registry
)
.unwrap()
.relabel_debug_1(level);

let actor_count = register_guarded_int_gauge_vec_with_registry!(
"stream_actor_count",
"Total number of actors (parallelism)",
Expand All @@ -403,15 +421,6 @@ impl StreamingMetrics {
)
.unwrap();

// dead code
let actor_memory_usage = register_guarded_int_gauge_vec_with_registry!(
"actor_memory_usage",
"Memory usage (bytes)",
&["actor_id", "fragment_id"],
registry,
)
.unwrap();

let opts = histogram_opts!(
"stream_merge_barrier_align_duration",
"Duration of merge align barrier",
Expand Down Expand Up @@ -1049,9 +1058,9 @@ impl StreamingMetrics {
actor_idle_duration,
actor_idle_cnt,
actor_count,
actor_memory_usage,
actor_in_record_cnt,
actor_out_record_cnt,
actor_current_epoch,
source_output_row_count,
source_split_change_count,
source_backfill_row_count,
Expand Down Expand Up @@ -1135,6 +1144,7 @@ impl StreamingMetrics {
materialize_cache_hit_count,
materialize_cache_total_count,
materialize_input_row_count,
materialize_current_epoch,
}
}

Expand Down Expand Up @@ -1494,6 +1504,9 @@ impl StreamingMetrics {
materialize_input_row_count: self
.materialize_input_row_count
.with_guarded_label_values(label_list),
materialize_current_epoch: self
.materialize_current_epoch
.with_guarded_label_values(label_list),
}
}
}
Expand Down Expand Up @@ -1527,6 +1540,7 @@ pub struct MaterializeMetrics {
pub materialize_cache_hit_count: LabelGuardedIntCounter<3>,
pub materialize_cache_total_count: LabelGuardedIntCounter<3>,
pub materialize_input_row_count: LabelGuardedIntCounter<3>,
pub materialize_current_epoch: LabelGuardedIntGauge<3>,
}

pub struct GroupTopNMetrics {
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
self.materialize_cache.data.clear();
}
}

self.metrics
.materialize_current_epoch
.set(b.epoch.curr as i64);

Message::Barrier(b)
}
}
Expand Down
Loading