Skip to content

Commit

Permalink
over window
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh committed Nov 2, 2023
1 parent dd66545 commit 1b4081d
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
6 changes: 3 additions & 3 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,23 +689,23 @@ impl StreamingMetrics {
let over_window_cached_entry_count = register_int_gauge_vec_with_registry!(
"stream_over_window_cached_entry_count",
"Total entry (partition) count in over window executor cache",
&["table_id", "actor_id"],
&["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();

let over_window_cache_lookup_count = register_int_counter_vec_with_registry!(
"stream_over_window_cache_lookup_count",
"Over window executor cache lookup count",
&["table_id", "actor_id"],
&["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();

let over_window_cache_miss_count = register_int_counter_vec_with_registry!(
"stream_over_window_cache_miss_count",
"Over window executor cache miss count",
&["table_id", "actor_id"],
&["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
Expand Down
7 changes: 4 additions & 3 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,18 +613,19 @@ impl<S: StateStore> OverWindowExecutor<S> {
{
// update metrics
let actor_id_str = this.actor_ctx.id.to_string();
let fragment_id_str = this.actor_ctx.fragment_id.to_string();
let table_id_str = this.state_table.table_id().to_string();
this.metrics
.over_window_cached_entry_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(vars.cached_partitions.len() as _);
this.metrics
.over_window_cache_lookup_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc_by(std::mem::take(&mut vars.stats.cache_lookup));
this.metrics
.over_window_cache_miss_count
.with_label_values(&[&table_id_str, &actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc_by(std::mem::take(&mut vars.stats.cache_miss));
}

Expand Down

0 comments on commit 1b4081d

Please sign in to comment.