diff --git a/src/compute/src/memory/controller.rs b/src/compute/src/memory/controller.rs index 1129bb338be4b..cf025f9138dd3 100644 --- a/src/compute/src/memory/controller.rs +++ b/src/compute/src/memory/controller.rs @@ -153,16 +153,17 @@ impl LruWatermarkController { let last_step = self.state.lru_watermark_step; let last_used_memory_bytes = self.state.used_memory_bytes; - let target_memory_bytes = self.threshold_graceful; - if cur_used_memory_bytes > target_memory_bytes { - let ratio = - (cur_used_memory_bytes - target_memory_bytes) as f64 / cur_used_memory_bytes as f64; - let latest_sequence = self.latest_sequence.load(Ordering::Relaxed); - let sequence_diff = - ((latest_sequence - self.evict_sequence) as f64 * ratio) as Sequence; - self.evict_sequence = latest_sequence.min(self.evict_sequence + sequence_diff); - } - let sequence = self.evict_sequence; + // * aggressive ~ inf : x2 + // * graceful ~ aggressive : x1.5 + // * stable ~ graceful : x1 + // * 0 ~ stable : no evict + let to_evict_bytes = cur_used_memory_bytes.saturating_sub(self.threshold_aggressive) / 2 + + cur_used_memory_bytes.saturating_sub(self.threshold_graceful) / 2 + + cur_used_memory_bytes.saturating_sub(self.threshold_stable); + let ratio = to_evict_bytes as f64 / cur_used_memory_bytes as f64; + let latest_sequence = self.latest_sequence.load(Ordering::Relaxed); + let sequence_diff = ((latest_sequence - self.evict_sequence) as f64 * ratio) as Sequence; + self.evict_sequence = latest_sequence.min(self.evict_sequence + sequence_diff); // The watermark calculation works in the following way: // @@ -242,6 +243,9 @@ impl LruWatermarkController { .set(jvm_allocated_bytes as i64); self.metrics.jvm_active_bytes.set(jvm_active_bytes as i64); - (Epoch::from_physical_time(watermark_time_ms), sequence) + ( + Epoch::from_physical_time(watermark_time_ms), + self.evict_sequence, + ) } }