Skip to content

Commit

Permalink
refactor: add evict sequence for metrics
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx committed Mar 22, 2024
1 parent 8cbdf6b commit 90e23ee
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1072,12 +1072,12 @@ def section_streaming_actors(outer_panels):
"The operator-level evicted memory statistics collected by each LRU cache",
[
panels.target(
f"sum({metric('stream_memory_evicted')}) by (table_id, epoch)",
"table {{table_id}} epoch: {{epoch}}",
f"sum({metric('stream_memory_evicted')}) by (table_id, sequence)",
"table {{table_id}} sequence: {{sequence}}",
),
panels.target_hidden(
f"{metric('stream_memory_evicted')}",
"table {{table_id}} actor {{actor_id}} epoch: {{epoch}}",
"table {{table_id}} actor {{actor_id}} sequence: {{sequence}} epoch: {{epoch}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion src/stream/src/cache/managed_lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ pub struct ManagedLruCache<K, V, S = DefaultHasher, A: Clone + Allocator = Globa
/// The metrics of memory usage
memory_usage_metrics: LabelGuardedIntGauge<3>,
/// The metrics of evicted memory by epoch.
memory_evicted_metrics: LabelGuardedIntGaugeVec<3>,
memory_evicted_metrics: LabelGuardedIntGaugeVec<4>,
/// The sequence of evict by epoch operation.
evict_sequence: usize,
// The metrics of evicted watermark time
lru_evicted_watermark_time_ms: LabelGuardedIntGauge<3>,
// Metrics info
Expand Down Expand Up @@ -91,6 +93,7 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
kv_heap_size: 0,
memory_usage_metrics,
memory_evicted_metrics,
evict_sequence: 0,
lru_evicted_watermark_time_ms,
metrics_info,
last_reported_size_bytes: 0,
Expand Down Expand Up @@ -118,6 +121,7 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
.with_guarded_label_values(&[
&lru.metrics_info.table_id,
&lru.metrics_info.actor_id,
&lru.evict_sequence.to_string(),
&epoch.to_string(),
])
.set(evicted as _);
Expand All @@ -126,6 +130,8 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
let mut last_epoch = 0; // real epoch must be greater than 0
let mut evicted = 0;

self.evict_sequence += 1;

while let Some((key, value, e)) = self.inner.pop_lru_by_epoch(epoch) {
let charge = key.estimated_size() + value.estimated_size();
self.kv_heap_size_dec(charge);
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ pub struct StreamingMetrics {

// Memory
pub stream_memory_usage: LabelGuardedIntGaugeVec<3>,
pub stream_memory_evicted: LabelGuardedIntGaugeVec<3>,
pub stream_memory_evicted: LabelGuardedIntGaugeVec<4>,
}

pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
Expand Down Expand Up @@ -1002,7 +1002,7 @@ impl StreamingMetrics {
let stream_memory_evicted = register_guarded_int_gauge_vec_with_registry!(
"stream_memory_evicted",
"Memory evicted for stream executors",
&["table_id", "actor_id", "epoch"],
&["table_id", "actor_id", "sequence", "epoch"],
registry
)
.unwrap();
Expand Down

0 comments on commit 90e23ee

Please sign in to comment.