diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs index b13e99a4c4be3..47bd9d5f22697 100644 --- a/src/stream/src/executor/monitor/streaming_stats.rs +++ b/src/stream/src/executor/monitor/streaming_stats.rs @@ -689,7 +689,7 @@ impl StreamingMetrics { let over_window_cached_entry_count = register_int_gauge_vec_with_registry!( "stream_over_window_cached_entry_count", "Total entry (partition) count in over window executor cache", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -697,7 +697,7 @@ impl StreamingMetrics { let over_window_cache_lookup_count = register_int_counter_vec_with_registry!( "stream_over_window_cache_lookup_count", "Over window executor cache lookup count", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); @@ -705,7 +705,7 @@ impl StreamingMetrics { let over_window_cache_miss_count = register_int_counter_vec_with_registry!( "stream_over_window_cache_miss_count", "Over window executor cache miss count", - &["table_id", "actor_id"], + &["table_id", "actor_id", "fragment_id"], registry ) .unwrap(); diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index 38c959039bf56..4b105e2994f65 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -613,18 +613,19 @@ impl OverWindowExecutor { { // update metrics let actor_id_str = this.actor_ctx.id.to_string(); + let fragment_id_str = this.actor_ctx.fragment_id.to_string(); let table_id_str = this.state_table.table_id().to_string(); this.metrics .over_window_cached_entry_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .set(vars.cached_partitions.len() as _); this.metrics .over_window_cache_lookup_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc_by(std::mem::take(&mut vars.stats.cache_lookup)); this.metrics .over_window_cache_miss_count - .with_label_values(&[&table_id_str, &actor_id_str]) + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc_by(std::mem::take(&mut vars.stats.cache_miss)); }