Skip to content

Commit

Permalink
feat(over window): add over_window_accessed_entry_count to metric f…
Browse files Browse the repository at this point in the history
…or better observability (#18943)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Oct 21, 2024
1 parent 4dec2ea commit 1ca2ea7
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 20 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1321,16 +1321,20 @@ def section_streaming_actors(outer_panels: Panels):
],
),
panels.timeseries_actor_ops(
"Over Window Executor Compute Count",
"Over Window Executor State Computation",
"",
[
panels.target(
f"sum(rate({table_metric('stream_over_window_accessed_entry_count')}[$__rate_interval])) by (table_id, fragment_id)",
"accessed entry count - table {{table_id}} fragment {{fragment_id}}",
),
panels.target(
f"sum(rate({table_metric('stream_over_window_compute_count')}[$__rate_interval])) by (table_id, fragment_id)",
"compute count - table {{table_id}} fragment {{fragment_id}}",
),
panels.target(
f"sum(rate({table_metric('stream_over_window_same_result_count')}[$__rate_interval])) by (table_id, fragment_id)",
"same result count - table {{table_id}} fragment {{fragment_id}}",
f"sum(rate({table_metric('stream_over_window_same_output_count')}[$__rate_interval])) by (table_id, fragment_id)",
"same output count - table {{table_id}} fragment {{fragment_id}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

30 changes: 22 additions & 8 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ pub struct StreamingMetrics {
over_window_range_cache_lookup_count: LabelGuardedIntCounterVec<3>,
over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec<3>,
over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec<3>,
over_window_accessed_entry_count: LabelGuardedIntCounterVec<3>,
over_window_compute_count: LabelGuardedIntCounterVec<3>,
over_window_same_result_count: LabelGuardedIntCounterVec<3>,
over_window_same_output_count: LabelGuardedIntCounterVec<3>,

/// The duration from receipt of barrier to all actors collection.
/// And the max of all node `barrier_inflight_latency` is the latency for a barrier
Expand Down Expand Up @@ -790,6 +791,14 @@ impl StreamingMetrics {
)
.unwrap();

let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
"stream_over_window_accessed_entry_count",
"Over window accessed entry count",
&["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();

let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
"stream_over_window_compute_count",
"Over window compute count",
Expand All @@ -798,9 +807,9 @@ impl StreamingMetrics {
)
.unwrap();

let over_window_same_result_count = register_guarded_int_counter_vec_with_registry!(
"stream_over_window_same_result_count",
"Over window same result count",
let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
"stream_over_window_same_output_count",
"Over window same output count",
&["table_id", "actor_id", "fragment_id"],
registry
)
Expand Down Expand Up @@ -1096,8 +1105,9 @@ impl StreamingMetrics {
over_window_range_cache_lookup_count,
over_window_range_cache_left_miss_count,
over_window_range_cache_right_miss_count,
over_window_accessed_entry_count,
over_window_compute_count,
over_window_same_result_count,
over_window_same_output_count,
barrier_inflight_latency,
barrier_sync_latency,
barrier_manager_progress,
Expand Down Expand Up @@ -1451,11 +1461,14 @@ impl StreamingMetrics {
over_window_range_cache_right_miss_count: self
.over_window_range_cache_right_miss_count
.with_guarded_label_values(label_list),
over_window_accessed_entry_count: self
.over_window_accessed_entry_count
.with_guarded_label_values(label_list),
over_window_compute_count: self
.over_window_compute_count
.with_guarded_label_values(label_list),
over_window_same_result_count: self
.over_window_same_result_count
over_window_same_output_count: self
.over_window_same_output_count
.with_guarded_label_values(label_list),
}
}
Expand Down Expand Up @@ -1570,6 +1583,7 @@ pub struct OverWindowMetrics {
pub over_window_range_cache_lookup_count: LabelGuardedIntCounter<3>,
pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter<3>,
pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter<3>,
pub over_window_accessed_entry_count: LabelGuardedIntCounter<3>,
pub over_window_compute_count: LabelGuardedIntCounter<3>,
pub over_window_same_result_count: LabelGuardedIntCounter<3>,
pub over_window_same_output_count: LabelGuardedIntCounter<3>,
}
7 changes: 5 additions & 2 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,12 +415,15 @@ impl<S: StateStore> OverWindowExecutor<S> {
metrics
.over_window_range_cache_right_miss_count
.inc_by(stats.right_miss_count);
metrics
.over_window_accessed_entry_count
.inc_by(stats.accessed_entry_count);
metrics
.over_window_compute_count
.inc_by(stats.compute_count);
metrics
.over_window_same_result_count
.inc_by(stats.same_result_count);
.over_window_same_output_count
.inc_by(stats.same_output_count);

// Update recently accessed range for later shrinking cache.
if !this.cache_policy.is_full()
Expand Down
14 changes: 9 additions & 5 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,9 @@ pub(super) struct OverPartitionStats {
pub right_miss_count: u64,

// stats for window function state computation
pub accessed_entry_count: u64,
pub compute_count: u64,
pub same_result_count: u64,
pub same_output_count: u64,
}

/// [`AffectedRange`] represents a range of keys that are affected by a delta.
Expand Down Expand Up @@ -425,8 +426,9 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
)> {
let input_schema_len = table.get_data_types().len() - calls.len();
let mut part_changes = BTreeMap::new();
let mut accessed_entry_count = 0;
let mut compute_count = 0;
let mut same_result_count = 0;
let mut same_output_count = 0;

// Find affected ranges, this also ensures that all rows in the affected ranges are loaded
// into the cache.
Expand Down Expand Up @@ -507,6 +509,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
.into(),
);
}
accessed_entry_count += 1;
cursor.move_next();

key != last_frame_end
Expand All @@ -528,11 +531,11 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
.key_value()
.expect("cursor must be valid until `last_curr_key`");
let output = states.slide_no_evict_hint()?;

compute_count += 1;

let old_output = &row.as_inner()[input_schema_len..];
if !old_output.is_empty() && old_output == output {
same_result_count += 1;
same_output_count += 1;
}

let new_row = OwnedRow::new(
Expand Down Expand Up @@ -569,8 +572,9 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
} {}
}

self.stats.accessed_entry_count += accessed_entry_count;
self.stats.compute_count += compute_count;
self.stats.same_result_count += same_result_count;
self.stats.same_output_count += same_output_count;

Ok((part_changes, accessed_range))
}
Expand Down

0 comments on commit 1ca2ea7

Please sign in to comment.