Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): fragment level streaming metrics (part 2) #13196

Merged
merged 6 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions grafana/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ def target(self, expr, legendFormat, hide=False):
datasource=self.datasource,
hide=hide)

def target_hidden(self, expr, legendFormat):
return Target(expr=expr,
legendFormat=legendFormat,
datasource=self.datasource,
hide=True)

def table_target(self, expr, hide=False):
return Target(expr=expr,
datasource=self.datasource,
Expand Down
333 changes: 169 additions & 164 deletions grafana/risingwave-dev-dashboard.dashboard.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

16 changes: 9 additions & 7 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ pub struct StreamingMetrics {

// Streaming actor metrics from tokio (disabled by default)
pub actor_execution_time: GenericGaugeVec<AtomicF64>,
pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounterVec<3>,
pub actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounterVec<3>,
pub actor_scheduled_duration: GenericGaugeVec<AtomicF64>,
pub actor_scheduled_cnt: GenericGaugeVec<AtomicI64>,
pub actor_fast_poll_duration: GenericGaugeVec<AtomicF64>,
Expand Down Expand Up @@ -72,6 +70,10 @@ pub struct StreamingMetrics {
// Exchange (see also `compute::ExchangeServiceMetrics`)
pub exchange_frag_recv_size: GenericCounterVec<AtomicU64>,

// Backpressure
pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounterVec<3>,
pub actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounterVec<3>,

// Streaming Join
pub join_lookup_miss_count: LabelGuardedIntCounterVec<5>,
pub join_lookup_total_count: LabelGuardedIntCounterVec<5>,
Expand Down Expand Up @@ -687,23 +689,23 @@ impl StreamingMetrics {
let over_window_cached_entry_count = register_int_gauge_vec_with_registry!(
"stream_over_window_cached_entry_count",
"Total entry (partition) count in over window executor cache",
&["table_id", "actor_id"],
&["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();

let over_window_cache_lookup_count = register_int_counter_vec_with_registry!(
"stream_over_window_cache_lookup_count",
"Over window executor cache lookup count",
&["table_id", "actor_id"],
&["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();

let over_window_cache_miss_count = register_int_counter_vec_with_registry!(
"stream_over_window_cache_miss_count",
"Over window executor cache miss count",
&["table_id", "actor_id"],
&["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
Expand Down Expand Up @@ -931,8 +933,6 @@ impl StreamingMetrics {
level,
executor_row_count,
actor_execution_time,
actor_output_buffer_blocking_duration_ns,
actor_input_buffer_blocking_duration_ns,
actor_scheduled_duration,
actor_scheduled_cnt,
actor_fast_poll_duration,
Expand All @@ -952,6 +952,8 @@ impl StreamingMetrics {
sink_input_row_count,
mview_input_row_count,
exchange_frag_recv_size,
actor_output_buffer_blocking_duration_ns,
actor_input_buffer_blocking_duration_ns,
join_lookup_miss_count,
join_lookup_total_count,
join_insert_cache_miss_count,
Expand Down
7 changes: 4 additions & 3 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,18 +613,19 @@ impl<S: StateStore> OverWindowExecutor<S> {
{
// update metrics
let actor_id_str = this.actor_ctx.id.to_string();
let fragment_id_str = this.actor_ctx.fragment_id.to_string();
let table_id_str = this.state_table.table_id().to_string();
this.metrics
.over_window_cached_entry_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(vars.cached_partitions.len() as _);
this.metrics
.over_window_cache_lookup_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc_by(std::mem::take(&mut vars.stats.cache_lookup));
this.metrics
.over_window_cache_miss_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc_by(std::mem::take(&mut vars.stats.cache_miss));
}

Expand Down