Skip to content

Commit

Permalink
feat(over window): add metrics to display over window range cache mis…
Browse files Browse the repository at this point in the history
…s rate, etc. (#13950)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Dec 13, 2023
1 parent dc23808 commit 263b0c6
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 5 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

30 changes: 28 additions & 2 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,19 @@ def section_streaming_actors(outer_panels):
f"rate({table_metric('stream_over_window_cache_miss_count')}[$__rate_interval])",
"cache miss count - table {{table_id}} actor {{actor_id}}",
),

panels.target(
f"sum(rate({table_metric('stream_over_window_range_cache_lookup_count')}[$__rate_interval])) by (table_id, fragment_id)",
"partition range cache lookup count - table {{table_id}} fragment {{fragment_id}}",
),
panels.target(
f"sum(rate({table_metric('stream_over_window_range_cache_left_miss_count')}[$__rate_interval])) by (table_id, fragment_id)",
"partition range cache left miss count - table {{table_id}} fragment {{fragment_id}}",
),
panels.target(
f"sum(rate({table_metric('stream_over_window_range_cache_right_miss_count')}[$__rate_interval])) by (table_id, fragment_id)",
"partition range cache right miss count - table {{table_id}} fragment {{fragment_id}}",
),
],
),
panels.timeseries_percentage(
Expand All @@ -1132,7 +1145,7 @@ def section_streaming_actors(outer_panels):
[
panels.target(
f"(sum(rate({metric('stream_join_lookup_miss_count')}[$__rate_interval])) by (side, join_table_id, degree_table_id, fragment_id) ) / (sum(rate({metric('stream_join_lookup_total_count')}[$__rate_interval])) by (side, join_table_id, degree_table_id, fragment_id))",
"join executor cache miss ratio - - {{side}} side, join_table_id {{join_table_id}} degree_table_id {{degree_table_id}} fragment {{fragment_id}}",
"Join executor cache miss ratio - - {{side}} side, join_table_id {{join_table_id}} degree_table_id {{degree_table_id}} fragment {{fragment_id}}",
),
panels.target(
f"(sum(rate({metric('stream_agg_lookup_miss_count')}[$__rate_interval])) by (table_id, fragment_id) ) / (sum(rate({metric('stream_agg_lookup_total_count')}[$__rate_interval])) by (table_id, fragment_id))",
Expand Down Expand Up @@ -1160,13 +1173,21 @@ def section_streaming_actors(outer_panels):
),
panels.target(
f"1 - (sum(rate({metric('stream_materialize_cache_hit_count')}[$__rate_interval])) by (table_id, fragment_id) ) / (sum(rate({metric('stream_materialize_cache_total_count')}[$__rate_interval])) by (table_id, fragment_id))",
"materialize executor cache miss ratio - table {{table_id}} fragment {{fragment_id}} {{%s}}"
"Materialize executor cache miss ratio - table {{table_id}} fragment {{fragment_id}} {{%s}}"
% NODE_LABEL,
),
panels.target(
f"(sum(rate({metric('stream_over_window_cache_miss_count')}[$__rate_interval])) by (table_id, fragment_id) ) / (sum(rate({metric('stream_over_window_cache_lookup_count')}[$__rate_interval])) by (table_id, fragment_id))",
"Over window cache miss ratio - table {{table_id}} fragment {{fragment_id}} ",
),
panels.target(
f"(sum(rate({metric('stream_over_window_range_cache_left_miss_count')}[$__rate_interval])) by (table_id, fragment_id) ) / (sum(rate({metric('stream_over_window_range_cache_lookup_count')}[$__rate_interval])) by (table_id, fragment_id))",
"Over window partition range cache left miss ratio - table {{table_id}} fragment {{fragment_id}} ",
),
panels.target(
f"(sum(rate({metric('stream_over_window_range_cache_right_miss_count')}[$__rate_interval])) by (table_id, fragment_id) ) / (sum(rate({metric('stream_over_window_range_cache_lookup_count')}[$__rate_interval])) by (table_id, fragment_id))",
"Over window partition range cache right miss ratio - table {{table_id}} fragment {{fragment_id}} ",
),
],
),
panels.timeseries_actor_latency(
Expand Down Expand Up @@ -1382,6 +1403,11 @@ def section_streaming_actors(outer_panels):
f"{metric('stream_over_window_cached_entry_count')}",
"over window cached count | table {{table_id}} actor {{actor_id}}",
),

panels.target(
f"sum({metric('stream_over_window_range_cache_entry_count')}) by (table_id, fragment_id)",
"over window partition range cache entry count | table {{table_id}} fragment {{fragment_id}}",
),
],
),
panels.timeseries_actor_rowsps(
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ pub struct StreamingMetrics {
pub over_window_cached_entry_count: GenericGaugeVec<AtomicI64>,
pub over_window_cache_lookup_count: GenericCounterVec<AtomicU64>,
pub over_window_cache_miss_count: GenericCounterVec<AtomicU64>,
pub over_window_range_cache_entry_count: GenericGaugeVec<AtomicI64>,
pub over_window_range_cache_lookup_count: GenericCounterVec<AtomicU64>,
pub over_window_range_cache_left_miss_count: GenericCounterVec<AtomicU64>,
pub over_window_range_cache_right_miss_count: GenericCounterVec<AtomicU64>,

/// The duration from receipt of barrier to all actors collection.
/// And the max of all node `barrier_inflight_latency` is the latency for a barrier
Expand Down Expand Up @@ -728,6 +732,38 @@ impl StreamingMetrics {
)
.unwrap();

let over_window_range_cache_entry_count = register_int_gauge_vec_with_registry!(
"stream_over_window_range_cache_entry_count",
"Over window partition range cache entry count",
&["table_id", "actor_id", "fragment_id"],
registry,
)
.unwrap();

let over_window_range_cache_lookup_count = register_int_counter_vec_with_registry!(
"stream_over_window_range_cache_lookup_count",
"Over window partition range cache lookup count",
&["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();

let over_window_range_cache_left_miss_count = register_int_counter_vec_with_registry!(
"stream_over_window_range_cache_left_miss_count",
"Over window partition range cache left miss count",
&["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();

let over_window_range_cache_right_miss_count = register_int_counter_vec_with_registry!(
"stream_over_window_range_cache_right_miss_count",
"Over window partition range cache right miss count",
&["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();

let opts = histogram_opts!(
"stream_barrier_inflight_duration_seconds",
"barrier_inflight_latency",
Expand Down Expand Up @@ -1005,6 +1041,10 @@ impl StreamingMetrics {
over_window_cached_entry_count,
over_window_cache_lookup_count,
over_window_cache_miss_count,
over_window_range_cache_entry_count,
over_window_range_cache_lookup_count,
over_window_range_cache_left_miss_count,
over_window_range_cache_right_miss_count,
barrier_inflight_latency,
barrier_sync_latency,
barrier_manager_progress,
Expand Down
25 changes: 25 additions & 0 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ impl<S: StateStore> OverWindowExecutor<S> {
let mut chunk_builder =
StreamChunkBuilder::new(this.chunk_size, this.info.schema.data_types());

// Prepare things needed by metrics.
let actor_id = this.actor_ctx.id.to_string();
let fragment_id = this.actor_ctx.fragment_id.to_string();
let table_id = this.state_table.table_id().to_string();

// Build final changes partition by partition.
for (part_key, delta) in deltas {
vars.stats.cache_lookup += 1;
Expand Down Expand Up @@ -375,6 +380,26 @@ impl<S: StateStore> OverWindowExecutor<S> {
partition.write_record(&mut this.state_table, key, record);
}

let cache_len = partition.cache_real_len();
let stats = partition.summarize();
let metrics = this.actor_ctx.streaming_metrics.clone();
metrics
.over_window_range_cache_entry_count
.with_label_values(&[&table_id, &actor_id, &fragment_id])
.set(cache_len as i64);
metrics
.over_window_range_cache_lookup_count
.with_label_values(&[&table_id, &actor_id, &fragment_id])
.inc_by(stats.lookup_count);
metrics
.over_window_range_cache_left_miss_count
.with_label_values(&[&table_id, &actor_id, &fragment_id])
.inc_by(stats.left_miss_count);
metrics
.over_window_range_cache_right_miss_count
.with_label_values(&[&table_id, &actor_id, &fragment_id])
.inc_by(stats.right_miss_count);

// Update recently accessed range for later shrinking cache.
if !this.cache_policy.is_full()
&& let Some(accessed_range) = accessed_range
Expand Down
24 changes: 23 additions & 1 deletion src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ pub(super) fn shrink_partition_cache(
}
}

#[derive(Default)]
pub(super) struct OverPartitionStats {
pub lookup_count: u64,
pub left_miss_count: u64,
pub right_miss_count: u64,
}

/// A wrapper of [`PartitionCache`] that provides helper methods to manipulate the cache.
/// By putting this type inside `private` module, we can avoid misuse of the internal fields and
/// methods.
Expand All @@ -231,6 +238,9 @@ pub(super) struct OverPartition<'a, S: StateStore> {
order_key_indices: &'a [usize],
input_pk_indices: &'a [usize],
state_key_to_table_sub_pk_proj: Vec<usize>,

stats: OverPartitionStats,

_phantom: PhantomData<S>,
}

Expand Down Expand Up @@ -273,10 +283,20 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
order_key_indices,
input_pk_indices,
state_key_to_table_sub_pk_proj: projection,

stats: Default::default(),

_phantom: PhantomData,
}
}

/// Get a summary for the execution happened in the [`OverPartition`] in current round.
/// This will consume the [`OverPartition`] value itself.
pub fn summarize(self) -> OverPartitionStats {
// We may extend this function in the future.
self.stats
}

/// Get the number of cached entries ignoring sentinels.
pub fn cache_real_len(&self) -> usize {
let len = self.range_cache.inner().len();
Expand Down Expand Up @@ -402,6 +422,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
let cache_inner = unsafe { &*(self.range_cache.inner() as *const _) };
let ranges =
self::find_affected_ranges(self.calls, DeltaBTreeMap::new(cache_inner, delta));
self.stats.lookup_count += 1;

if ranges.is_empty() {
// no ranges affected, we're done
Expand All @@ -420,12 +441,13 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
};

if left_reached_sentinel {
// TODO(rc): should count cache miss for this, and also the below
self.stats.left_miss_count += 1;
tracing::trace!(partition=?self.this_partition_key, "partition cache left extension triggered");
let left_most = self.cache_real_first_key().unwrap_or(delta_first).clone();
self.extend_cache_leftward_by_n(table, &left_most).await?;
}
if right_reached_sentinel {
self.stats.right_miss_count += 1;
tracing::trace!(partition=?self.this_partition_key, "partition cache right extension triggered");
let right_most = self.cache_real_last_key().unwrap_or(delta_last).clone();
self.extend_cache_rightward_by_n(table, &right_most).await?;
Expand Down

0 comments on commit 263b0c6

Please sign in to comment.