Skip to content

Commit

Permalink
add cache refill metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Oct 24, 2024
1 parent c4e2dbe commit e0de688
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/stream/src/executor/aggregation/agg_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,11 +507,13 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
pub struct AggStateCacheStats {
pub agg_state_cache_lookup_count: u64,
pub agg_state_cache_miss_count: u64,
pub agg_state_cache_refill_duration_secs: f64,
}

impl AggStateCacheStats {
fn merge(&mut self, other: Self) {
self.agg_state_cache_lookup_count += other.agg_state_cache_lookup_count;
self.agg_state_cache_miss_count += other.agg_state_cache_miss_count;
self.agg_state_cache_refill_duration_secs += other.agg_state_cache_refill_duration_secs;
}
}
3 changes: 2 additions & 1 deletion src/stream/src/executor/aggregation/minput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ impl MaterializedInputState {
}
cache_filler.finish();

let duration = start.elapsed().as_secs();
let duration = start.elapsed().as_secs_f64();
stats.agg_state_cache_refill_duration_secs += duration;
}
assert!(self.cache.is_synced());

Expand Down
7 changes: 7 additions & 0 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,14 @@ struct ExecutionStats {
/// Agg state cache stats.
agg_state_cache_lookup_count: u64,
agg_state_cache_miss_count: u64,
agg_state_cache_refill_duration_secs: f64,
}

impl ExecutionStats {
fn merge_state_cache_stats(&mut self, other: AggStateCacheStats) {
self.agg_state_cache_lookup_count += other.agg_state_cache_lookup_count;
self.agg_state_cache_miss_count += other.agg_state_cache_miss_count;
self.agg_state_cache_refill_duration_secs += other.agg_state_cache_refill_duration_secs;
}
}

Expand Down Expand Up @@ -525,6 +527,11 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
vars.metrics
.agg_state_cache_miss_count
.inc_by(std::mem::take(&mut vars.stats.agg_state_cache_miss_count));
vars.metrics
.agg_state_cache_refill_duration_secs
.observe(std::mem::take(
&mut vars.stats.agg_state_cache_refill_duration_secs,
));
}

async fn commit_state_tables(
Expand Down
25 changes: 22 additions & 3 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use prometheus::{
use risingwave_common::catalog::TableId;
use risingwave_common::config::MetricLevel;
use risingwave_common::metrics::{
LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogram, LabelGuardedHistogramVec,
LabelGuardedIntCounter, LabelGuardedIntCounterVec, LabelGuardedIntGauge,
LabelGuardedIntGaugeVec, MetricVecRelabelExt, RelabeledGuardedHistogramVec,
RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common::{
Expand Down Expand Up @@ -111,6 +112,7 @@ pub struct StreamingMetrics {
agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec<3>,
agg_state_cache_lookup_count: LabelGuardedIntCounterVec<3>,
agg_state_cache_miss_count: LabelGuardedIntCounterVec<3>,
agg_state_cache_refill_duration_secs: LabelGuardedHistogramVec<3>,

// Streaming TopN
group_top_n_cache_miss_count: LabelGuardedIntCounterVec<3>,
Expand Down Expand Up @@ -568,6 +570,18 @@ impl StreamingMetrics {
)
.unwrap();

let opts = histogram_opts!(
"stream_agg_state_cache_refill_duration_secs",
"Aggregation executor state cache refill duration",
exponential_buckets(0.1, 1.5, 16).unwrap() // max 43s
);
let agg_state_cache_refill_duration_secs = register_guarded_histogram_vec_with_registry!(
opts,
&["table_id", "actor_id", "fragment_id"], // FIXME(kwannoel): what's the granularity?
registry
)
.unwrap();

let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
"stream_group_top_n_cache_miss_count",
"Group top n executor cache miss count",
Expand Down Expand Up @@ -1081,6 +1095,7 @@ impl StreamingMetrics {
agg_distinct_cached_entry_count,
agg_state_cache_lookup_count,
agg_state_cache_miss_count,
agg_state_cache_refill_duration_secs,
group_top_n_cache_miss_count,
group_top_n_total_query_cache_count,
group_top_n_cached_entry_count,
Expand Down Expand Up @@ -1349,6 +1364,9 @@ impl StreamingMetrics {
agg_state_cache_miss_count: self
.agg_state_cache_miss_count
.with_guarded_label_values(label_list),
agg_state_cache_refill_duration_secs: self
.agg_state_cache_refill_duration_secs
.with_guarded_label_values(label_list),
}
}

Expand Down Expand Up @@ -1551,6 +1569,7 @@ pub struct HashAggMetrics {
pub agg_dirty_groups_heap_size: LabelGuardedIntGauge<3>,
pub agg_state_cache_lookup_count: LabelGuardedIntCounter<3>,
pub agg_state_cache_miss_count: LabelGuardedIntCounter<3>,
pub agg_state_cache_refill_duration_secs: LabelGuardedHistogram<3>,
}

pub struct AggDistinctDedupMetrics {
Expand Down

0 comments on commit e0de688

Please sign in to comment.