From a3ea7c0bc48dc26c200739f5a852638a68186487 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Sat, 25 Nov 2023 11:00:48 +0800 Subject: [PATCH] refactor: memory management (#13636) --- .../src/estimate_size/collections/lru.rs | 2 +- src/compute/src/lib.rs | 2 +- .../mod.rs => memory/config.rs} | 78 +------ src/compute/src/memory/controller.rs | 198 +++++++++++++++++ src/compute/src/memory/manager.rs | 94 ++++++++ src/compute/src/memory/mod.rs | 17 ++ .../src/memory_management/memory_manager.rs | 137 ------------ src/compute/src/memory_management/policy.rs | 209 ------------------ src/compute/src/server.rs | 14 +- src/stream/src/cache/managed_lru.rs | 4 +- 10 files changed, 326 insertions(+), 429 deletions(-) rename src/compute/src/{memory_management/mod.rs => memory/config.rs} (76%) create mode 100644 src/compute/src/memory/controller.rs create mode 100644 src/compute/src/memory/manager.rs create mode 100644 src/compute/src/memory/mod.rs delete mode 100644 src/compute/src/memory_management/memory_manager.rs delete mode 100644 src/compute/src/memory_management/policy.rs diff --git a/src/common/src/estimate_size/collections/lru.rs b/src/common/src/estimate_size/collections/lru.rs index cb220f50100e6..d6889050dc108 100644 --- a/src/common/src/estimate_size/collections/lru.rs +++ b/src/common/src/estimate_size/collections/lru.rs @@ -22,7 +22,7 @@ use super::{AtomicMutGuard, MutGuard}; use crate::estimate_size::{EstimateSize, KvSize}; /// The managed cache is a lru cache that bounds the memory usage by epoch. -/// Should be used with `GlobalMemoryManager`. +/// Should be used with `MemoryManager`. pub struct EstimatedLruCache { inner: LruCache, kv_heap_size: KvSize, diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 8bc9093274333..96110b487536e 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -25,7 +25,7 @@ #[macro_use] extern crate tracing; -pub mod memory_management; +pub mod memory; pub mod observer; pub mod rpc; pub mod server; diff --git a/src/compute/src/memory_management/mod.rs b/src/compute/src/memory/config.rs similarity index 76% rename from src/compute/src/memory_management/mod.rs rename to src/compute/src/memory/config.rs index a7e9baa15b369..2bf9e9ba9a95f 100644 --- a/src/compute/src/memory_management/mod.rs +++ b/src/compute/src/memory/config.rs @@ -12,88 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod memory_manager; - -// Only enable the non-trivial policies on Linux as it relies on statistics from `jemalloc-ctl` -// which might be inaccurate on other platforms. -pub mod policy; - -use std::sync::atomic::AtomicU64; -use std::sync::Arc; - -use risingwave_batch::task::BatchManager; use risingwave_common::config::{StorageConfig, StorageMemoryConfig}; use risingwave_common::util::pretty_bytes::convert; -use risingwave_stream::task::LocalStreamManager; /// The minimal memory requirement of computing tasks in megabytes. pub const MIN_COMPUTE_MEMORY_MB: usize = 512; /// The memory reserved for system usage (stack and code segment of processes, allocation /// overhead, network buffer, etc.) in megabytes. pub const MIN_SYSTEM_RESERVED_MEMORY_MB: usize = 512; -pub const SYSTEM_RESERVED_MEMORY_PROPORTION: f64 = 0.2; -pub const STORAGE_MEMORY_PROPORTION: f64 = 0.3; +const SYSTEM_RESERVED_MEMORY_PROPORTION: f64 = 0.2; -pub const COMPACTOR_MEMORY_PROPORTION: f64 = 0.1; +const STORAGE_MEMORY_PROPORTION: f64 = 0.3; -pub const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3; +const COMPACTOR_MEMORY_PROPORTION: f64 = 0.1; -pub const STORAGE_META_CACHE_MAX_MEMORY_MB: usize = 4096; -pub const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35; -pub const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3; -pub const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 50; -// Since the new feature prefetch does not cost much memory, we set a large value by default for performance. If we meet OOM during long time batch query, we shall reduce this configuration. -pub const STORAGE_DEFAULT_LARGE_QUERY_MEMORY_USAGE_MB: usize = 32 * 1024; +const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3; -/// `MemoryControlStats` contains the state from previous control loop -#[derive(Default)] -pub struct MemoryControlStats { - pub jemalloc_allocated_bytes: usize, - pub jemalloc_active_bytes: usize, - pub jvm_allocated_bytes: usize, - pub jvm_active_bytes: usize, - pub lru_watermark_step: u64, - pub lru_watermark_time_ms: u64, - pub lru_physical_now_ms: u64, -} - -pub type MemoryControlRef = Box; - -pub trait MemoryControl: Send + Sync + std::fmt::Debug { - fn apply( - &self, - interval_ms: u32, - prev_memory_stats: MemoryControlStats, - batch_manager: Arc, - stream_manager: Arc, - watermark_epoch: Arc, - ) -> MemoryControlStats; -} +const STORAGE_META_CACHE_MAX_MEMORY_MB: usize = 4096; +const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35; +const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3; +const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 50; -pub fn build_memory_control_policy(total_memory_bytes: usize) -> MemoryControlRef { - use self::policy::JemallocAndJvmMemoryControl; - - Box::new(JemallocAndJvmMemoryControl::new(total_memory_bytes)) -} - -/// `DummyPolicy` is used for operarting systems other than Linux. It does nothing as memory control -/// is disabled on non-Linux OS. -#[derive(Debug)] -pub struct DummyPolicy; - -impl MemoryControl for DummyPolicy { - fn apply( - &self, - _interval_ms: u32, - _prev_memory_stats: MemoryControlStats, - _batch_manager: Arc, - _stream_manager: Arc, - _watermark_epoch: Arc, - ) -> MemoryControlStats { - MemoryControlStats::default() - } -} +/// Since the new feature prefetch does not cost much memory, we set a large value by default for performance. If we meet OOM during long time batch query, we shall reduce this configuration. +const STORAGE_DEFAULT_LARGE_QUERY_MEMORY_USAGE_MB: usize = 32 * 1024; /// Each compute node reserves some memory for stack and code segment of processes, allocation /// overhead, network buffer, etc. based on `SYSTEM_RESERVED_MEMORY_PROPORTION`. The reserve memory diff --git a/src/compute/src/memory/controller.rs b/src/compute/src/memory/controller.rs new file mode 100644 index 0000000000000..5c9518293f38a --- /dev/null +++ b/src/compute/src/memory/controller.rs @@ -0,0 +1,198 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use risingwave_common::util::epoch::Epoch; +use risingwave_jni_core::jvm_runtime::load_jvm_memory_stats; +use risingwave_stream::executor::monitor::StreamingMetrics; + +/// Internal state of [`LruWatermarkController`] that saves the state in previous tick. +struct State { + pub used_memory_bytes: usize, + pub lru_watermark_step: u64, + pub lru_watermark_time_ms: u64, +} + +impl Default for State { + fn default() -> Self { + let physical_now = Epoch::physical_now(); + Self { + used_memory_bytes: 0, + lru_watermark_step: 0, + lru_watermark_time_ms: physical_now, + } + } +} + +/// `LruWatermarkController` controls LRU Watermark (epoch) according to actual memory usage statistics +/// collected from Jemalloc and JVM. +/// +/// Basically, it works as a negative feedback loop: collect memory statistics, and then set the LRU watarmarking +/// according to maintain the memory usage in a proper level. +/// +/// ```text +/// ┌───────────────────┐ ┌───────────────────┐ +/// │ INPUT │ │ OUTPUT │ +/// ┌───► (observe) ├───────►│ (control) ├───┐ +/// │ │ Memory Statistics │ │ New LRU Watermark │ │ +/// │ └───────────────────┘ └───────────────────┘ │ +/// │ │ +/// └────────────────────────────────────────────────────────┘ +/// ``` +/// +/// Check the function [`Self::tick()`] to see the control policy. +pub struct LruWatermarkController { + metrics: Arc, + + threshold_stable: usize, + threshold_graceful: usize, + threshold_aggressive: usize, + + /// The state from previous tick + state: State, +} + +impl LruWatermarkController { + // TODO(eric): make them configurable + const THRESHOLD_AGGRESSIVE: f64 = 0.9; + const THRESHOLD_GRACEFUL: f64 = 0.8; + const THRESHOLD_STABLE: f64 = 0.7; + + pub fn new(total_memory: usize, metrics: Arc) -> Self { + let threshold_stable = (total_memory as f64 * Self::THRESHOLD_STABLE) as usize; + let threshold_graceful = (total_memory as f64 * Self::THRESHOLD_GRACEFUL) as usize; + let threshold_aggressive = (total_memory as f64 * Self::THRESHOLD_AGGRESSIVE) as usize; + + Self { + metrics, + threshold_stable, + threshold_graceful, + threshold_aggressive, + state: State::default(), + } + } +} + +impl std::fmt::Debug for LruWatermarkController { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LruWatermarkController") + .field("threshold_stable", &self.threshold_stable) + .field("threshold_graceful", &self.threshold_graceful) + .field("threshold_aggressive", &self.threshold_aggressive) + .finish() + } +} + +/// Get memory statistics from Jemalloc +/// +/// - `stats.allocated`: Total number of bytes allocated by the application. +/// - `stats.active`: Total number of bytes in active pages allocated by the application. This is a multiple of the page size, and greater than or equal to `stats.allocated`. This does not include `stats.arenas..pdirty`, `stats.arenas..pmuzzy`, nor pages entirely devoted to allocator metadata. +/// +/// Reference: +fn jemalloc_memory_stats() -> (usize, usize) { + if let Err(e) = tikv_jemalloc_ctl::epoch::advance() { + tracing::warn!("Jemalloc epoch advance failed! {:?}", e); + } + let allocated = tikv_jemalloc_ctl::stats::allocated::read().unwrap(); + let active = tikv_jemalloc_ctl::stats::active::read().unwrap(); + (allocated, active) +} + +impl LruWatermarkController { + pub fn tick(&mut self, interval_ms: u32) -> Epoch { + // NOTE: Be careful! The meaning of `allocated` and `active` differ in JeMalloc and JVM + let (jemalloc_allocated_bytes, jemalloc_active_bytes) = jemalloc_memory_stats(); + let (jvm_allocated_bytes, jvm_active_bytes) = load_jvm_memory_stats(); + + let cur_used_memory_bytes = jemalloc_active_bytes + jvm_allocated_bytes; + + let last_step = self.state.lru_watermark_step; + let last_used_memory_bytes = self.state.used_memory_bytes; + + // The watermark calculation works in the following way: + // + // 1. When the streaming memory usage is below the graceful threshold, we do not evict + // any caches, and simply reset the step to 0. + // + // 2. When the memory usage is between the graceful and aggressive threshold: + // - If the last eviction memory usage decreases after last eviction, we set the eviction step + // to 1. + // - Otherwise, we set the step to last_step + 1. + // + // 3. When the memory usage exceeds the aggressive threshold: + // - If the memory usage decreases after the last eviction, we set the eviction step to + // last_step. + // - Otherwise, we set the step to last_step * 2. + let mut step = if cur_used_memory_bytes < self.threshold_stable { + // Do not evict if the memory usage is lower than `threshold_stable` + 0 + } else if cur_used_memory_bytes < self.threshold_graceful { + // Evict in equal speed of time before `threshold_graceful` + 1 + } else if cur_used_memory_bytes < self.threshold_aggressive { + // Gracefully evict + if last_used_memory_bytes > cur_used_memory_bytes { + 1 + } else { + last_step + 1 + } + } else if last_used_memory_bytes < cur_used_memory_bytes { + // Aggressively evict + if last_step == 0 { + 2 + } else { + last_step * 2 + } + } else { + last_step + }; + + let physical_now = Epoch::physical_now(); + let watermark_time_ms = + if (physical_now - self.state.lru_watermark_time_ms) / (interval_ms as u64) < step { + // We do not increase the step and watermark here to prevent a too-advanced watermark. The + // original condition is `prev_watermark_time_ms + interval_ms * step > now`. + step = last_step; + physical_now + } else { + // Increase by (steps * interval) + self.state.lru_watermark_time_ms + (interval_ms as u64 * step) + }; + + self.state = State { + used_memory_bytes: cur_used_memory_bytes, + lru_watermark_step: step, + lru_watermark_time_ms: watermark_time_ms, + }; + + self.metrics + .lru_current_watermark_time_ms + .set(watermark_time_ms as i64); + self.metrics.lru_physical_now_ms.set(physical_now as i64); + self.metrics.lru_watermark_step.set(step as i64); + self.metrics + .jemalloc_allocated_bytes + .set(jemalloc_allocated_bytes as i64); + self.metrics + .jemalloc_active_bytes + .set(jemalloc_active_bytes as i64); + self.metrics + .jvm_allocated_bytes + .set(jvm_allocated_bytes as i64); + self.metrics.jvm_active_bytes.set(jvm_active_bytes as i64); + + Epoch::from_physical_time(watermark_time_ms) + } +} diff --git a/src/compute/src/memory/manager.rs b/src/compute/src/memory/manager.rs new file mode 100644 index 0000000000000..09c20d4e5d5b2 --- /dev/null +++ b/src/compute/src/memory/manager.rs @@ -0,0 +1,94 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use risingwave_common::system_param::local_manager::SystemParamsReaderRef; +use risingwave_stream::executor::monitor::StreamingMetrics; + +use super::controller::LruWatermarkController; + +/// Compute node uses [`MemoryManager`] to limit the memory usage. +pub struct MemoryManager { + /// All cached data before the watermark should be evicted. + watermark_epoch: Arc, + + metrics: Arc, + + controller: Mutex, +} + +impl MemoryManager { + // Arbitrarily set a minimal barrier interval in case it is too small, + // especially when it's 0. + const MIN_TICK_INTERVAL_MS: u32 = 10; + + pub fn new(metrics: Arc, total_memory_bytes: usize) -> Arc { + let controller = Mutex::new(LruWatermarkController::new( + total_memory_bytes, + metrics.clone(), + )); + tracing::info!("LRU watermark controller: {:?}", &controller); + + Arc::new(Self { + watermark_epoch: Arc::new(0.into()), + metrics, + controller, + }) + } + + pub fn get_watermark_epoch(&self) -> Arc { + self.watermark_epoch.clone() + } + + pub async fn run( + self: Arc, + initial_interval_ms: u32, + mut system_params_change_rx: tokio::sync::watch::Receiver, + ) { + // Loop interval of running control policy + let mut interval_ms = std::cmp::max(initial_interval_ms, Self::MIN_TICK_INTERVAL_MS); + tracing::info!( + "start running MemoryManager with interval {}ms", + interval_ms + ); + + // Keep same interval with the barrier interval + let mut tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64)); + + loop { + // Wait for a while to check if need eviction. + tokio::select! { + Ok(_) = system_params_change_rx.changed() => { + let params = system_params_change_rx.borrow().load(); + let new_interval_ms = std::cmp::max(params.barrier_interval_ms(), Self::MIN_TICK_INTERVAL_MS); + if new_interval_ms != interval_ms { + interval_ms = new_interval_ms; + tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64)); + tracing::info!("updated MemoryManager interval to {}ms", interval_ms); + } + } + + _ = tick_interval.tick() => { + let new_watermark_epoch = self.controller.lock().unwrap().tick(interval_ms); + self.watermark_epoch.store(new_watermark_epoch.0, Ordering::Relaxed); + + self.metrics.lru_runtime_loop_count.inc(); + } + } + } + } +} diff --git a/src/compute/src/memory/mod.rs b/src/compute/src/memory/mod.rs new file mode 100644 index 0000000000000..896389028fb12 --- /dev/null +++ b/src/compute/src/memory/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod config; +pub mod controller; +pub mod manager; diff --git a/src/compute/src/memory_management/memory_manager.rs b/src/compute/src/memory_management/memory_manager.rs deleted file mode 100644 index 721b39046572a..0000000000000 --- a/src/compute/src/memory_management/memory_manager.rs +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::atomic::AtomicU64; -use std::sync::Arc; -use std::time::Duration; - -use risingwave_batch::task::BatchManager; -use risingwave_common::system_param::local_manager::SystemParamsReaderRef; -use risingwave_common::util::epoch::Epoch; -use risingwave_stream::executor::monitor::StreamingMetrics; -use risingwave_stream::task::LocalStreamManager; - -use super::MemoryControlRef; -use crate::memory_management::{build_memory_control_policy, MemoryControlStats}; - -/// Compute node uses [`GlobalMemoryManager`] to limit the memory usage. -pub struct GlobalMemoryManager { - /// All cached data before the watermark should be evicted. - watermark_epoch: Arc, - - metrics: Arc, - /// The memory control policy for computing tasks. - memory_control_policy: MemoryControlRef, -} - -pub type GlobalMemoryManagerRef = Arc; - -impl GlobalMemoryManager { - // Arbitrarily set a minimal barrier interval in case it is too small, - // especially when it's 0. - const MIN_TICK_INTERVAL_MS: u32 = 10; - - pub fn new(metrics: Arc, total_memory_bytes: usize) -> Arc { - let memory_control_policy = build_memory_control_policy(total_memory_bytes); - tracing::info!("memory control policy: {:?}", &memory_control_policy); - - Arc::new(Self { - watermark_epoch: Arc::new(0.into()), - metrics, - memory_control_policy, - }) - } - - pub fn get_watermark_epoch(&self) -> Arc { - self.watermark_epoch.clone() - } - - /// Memory manager will get memory usage statistics from batch and streaming and perform memory - /// control accordingly. - pub async fn run( - self: Arc, - batch_manager: Arc, - stream_manager: Arc, - initial_interval_ms: u32, - mut system_params_change_rx: tokio::sync::watch::Receiver, - ) { - // Loop interval of running control policy - let mut interval_ms = std::cmp::max(initial_interval_ms, Self::MIN_TICK_INTERVAL_MS); - tracing::info!( - "start running GlobalMemoryManager with interval {}ms", - interval_ms - ); - - // Keep same interval with the barrier interval - let mut tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64)); - - let mut memory_control_stats = MemoryControlStats { - jemalloc_allocated_bytes: 0, - jemalloc_active_bytes: 0, - jvm_allocated_bytes: 0, - jvm_active_bytes: 0, - lru_watermark_step: 0, - lru_watermark_time_ms: Epoch::physical_now(), - lru_physical_now_ms: Epoch::physical_now(), - }; - - loop { - // Wait for a while to check if need eviction. - tokio::select! { - Ok(_) = system_params_change_rx.changed() => { - let params = system_params_change_rx.borrow().load(); - let new_interval_ms = std::cmp::max(params.barrier_interval_ms(), Self::MIN_TICK_INTERVAL_MS); - if new_interval_ms != interval_ms { - interval_ms = new_interval_ms; - tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64)); - tracing::info!("updated GlobalMemoryManager interval to {}ms", interval_ms); - } - } - - _ = tick_interval.tick() => { - memory_control_stats = self.memory_control_policy.apply( - interval_ms, - memory_control_stats, - batch_manager.clone(), - stream_manager.clone(), - self.watermark_epoch.clone(), - ); - - self.metrics - .lru_current_watermark_time_ms - .set(memory_control_stats.lru_watermark_time_ms as i64); - self.metrics - .lru_physical_now_ms - .set(memory_control_stats.lru_physical_now_ms as i64); - self.metrics - .lru_watermark_step - .set(memory_control_stats.lru_watermark_step as i64); - self.metrics.lru_runtime_loop_count.inc(); - self.metrics - .jemalloc_allocated_bytes - .set(memory_control_stats.jemalloc_allocated_bytes as i64); - self.metrics - .jemalloc_active_bytes - .set(memory_control_stats.jemalloc_active_bytes as i64); - self.metrics - .jvm_allocated_bytes - .set(memory_control_stats.jvm_allocated_bytes as i64); - self.metrics - .jvm_active_bytes - .set(memory_control_stats.jvm_active_bytes as i64); - } - } - } - } -} diff --git a/src/compute/src/memory_management/policy.rs b/src/compute/src/memory_management/policy.rs deleted file mode 100644 index 1eb5ff9832600..0000000000000 --- a/src/compute/src/memory_management/policy.rs +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::atomic::AtomicU64; -use std::sync::Arc; - -use risingwave_batch::task::BatchManager; -use risingwave_common::util::epoch::Epoch; -use risingwave_jni_core::jvm_runtime::load_jvm_memory_stats; -use risingwave_stream::task::LocalStreamManager; -use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats}; - -use super::{MemoryControl, MemoryControlStats}; - -/// `JemallocAndJvmMemoryControl` is a memory control policy that uses jemalloc statistics and -/// jvm memory statistics and to control. It assumes that most memory is used by streaming engine -/// and does memory control over LRU watermark based on jemalloc statistics and jvm memory statistics. -pub struct JemallocAndJvmMemoryControl { - threshold_stable: usize, - threshold_graceful: usize, - threshold_aggressive: usize, - - jemalloc_epoch_mib: tikv_jemalloc_ctl::epoch_mib, - jemalloc_allocated_mib: jemalloc_stats::allocated_mib, - jemalloc_active_mib: jemalloc_stats::active_mib, -} - -impl JemallocAndJvmMemoryControl { - const THRESHOLD_AGGRESSIVE: f64 = 0.9; - const THRESHOLD_GRACEFUL: f64 = 0.8; - const THRESHOLD_STABLE: f64 = 0.7; - - pub fn new(total_memory: usize) -> Self { - let threshold_stable = (total_memory as f64 * Self::THRESHOLD_STABLE) as usize; - let threshold_graceful = (total_memory as f64 * Self::THRESHOLD_GRACEFUL) as usize; - let threshold_aggressive = (total_memory as f64 * Self::THRESHOLD_AGGRESSIVE) as usize; - - let jemalloc_epoch_mib = jemalloc_epoch::mib().unwrap(); - let jemalloc_allocated_mib = jemalloc_stats::allocated::mib().unwrap(); - let jemalloc_active_mib = jemalloc_stats::active::mib().unwrap(); - - Self { - threshold_stable, - threshold_graceful, - threshold_aggressive, - jemalloc_epoch_mib, - jemalloc_allocated_mib, - jemalloc_active_mib, - } - } - - fn advance_jemalloc_epoch( - &self, - prev_jemalloc_allocated_bytes: usize, - prev_jemalloc_active_bytes: usize, - ) -> (usize, usize) { - if let Err(e) = self.jemalloc_epoch_mib.advance() { - tracing::warn!("Jemalloc epoch advance failed! {:?}", e); - } - - ( - self.jemalloc_allocated_mib.read().unwrap_or_else(|e| { - tracing::warn!("Jemalloc read allocated failed! {:?}", e); - prev_jemalloc_allocated_bytes - }), - self.jemalloc_active_mib.read().unwrap_or_else(|e| { - tracing::warn!("Jemalloc read active failed! {:?}", e); - prev_jemalloc_active_bytes - }), - ) - } -} - -impl std::fmt::Debug for JemallocAndJvmMemoryControl { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("JemallocMemoryControl") - .field("threshold_stable", &self.threshold_stable) - .field("threshold_graceful", &self.threshold_graceful) - .field("threshold_aggressive", &self.threshold_aggressive) - .finish() - } -} - -impl MemoryControl for JemallocAndJvmMemoryControl { - fn apply( - &self, - interval_ms: u32, - prev_memory_stats: MemoryControlStats, - _batch_manager: Arc, - _stream_manager: Arc, - watermark_epoch: Arc, - ) -> MemoryControlStats { - let (jemalloc_allocated_bytes, jemalloc_active_bytes) = self.advance_jemalloc_epoch( - prev_memory_stats.jemalloc_allocated_bytes, - prev_memory_stats.jemalloc_active_bytes, - ); - - let (jvm_allocated_bytes, jvm_active_bytes) = load_jvm_memory_stats(); - - // Streaming memory control - // - // We calculate the watermark of the LRU cache, which provides hints for streaming executors - // on cache eviction. Here we do the calculation based on jemalloc statistics. - - let (lru_watermark_step, lru_watermark_time_ms, lru_physical_now) = calculate_lru_watermark( - jemalloc_allocated_bytes + jvm_allocated_bytes, - self.threshold_stable, - self.threshold_graceful, - self.threshold_aggressive, - interval_ms, - prev_memory_stats, - ); - - set_lru_watermark_time_ms(watermark_epoch, lru_watermark_time_ms); - - MemoryControlStats { - jemalloc_allocated_bytes, - jemalloc_active_bytes, - jvm_allocated_bytes, - jvm_active_bytes, - lru_watermark_step, - lru_watermark_time_ms, - lru_physical_now_ms: lru_physical_now, - } - } -} - -fn calculate_lru_watermark( - cur_used_memory_bytes: usize, - threshold_stable: usize, - threshold_graceful: usize, - threshold_aggressive: usize, - interval_ms: u32, - prev_memory_stats: MemoryControlStats, -) -> (u64, u64, u64) { - let mut watermark_time_ms = prev_memory_stats.lru_watermark_time_ms; - let last_step = prev_memory_stats.lru_watermark_step; - let last_used_memory_bytes = - prev_memory_stats.jemalloc_allocated_bytes + prev_memory_stats.jvm_allocated_bytes; - - // The watermark calculation works in the following way: - // - // 1. When the streaming memory usage is below the graceful threshold, we do not evict - // any caches, and simply reset the step to 0. - // - // 2. When the memory usage is between the graceful and aggressive threshold: - // - If the last eviction memory usage decreases after last eviction, we set the eviction step - // to 1. - // - Otherwise, we set the step to last_step + 1. - // - // 3. When the memory usage exceeds the aggressive threshold: - // - If the memory usage decreases after the last eviction, we set the eviction step to - // last_step. - // - Otherwise, we set the step to last_step * 2. - - let mut step = if cur_used_memory_bytes < threshold_stable { - // Do not evict if the memory usage is lower than `threshold_stable` - 0 - } else if cur_used_memory_bytes < threshold_graceful { - // Evict in equal speed of time before `threshold_graceful` - 1 - } else if cur_used_memory_bytes < threshold_aggressive { - // Gracefully evict - if last_used_memory_bytes > cur_used_memory_bytes { - 1 - } else { - last_step + 1 - } - } else if last_used_memory_bytes < cur_used_memory_bytes { - // Aggressively evict - if last_step == 0 { - 2 - } else { - last_step * 2 - } - } else { - last_step - }; - - let physical_now = Epoch::physical_now(); - if (physical_now - prev_memory_stats.lru_watermark_time_ms) / (interval_ms as u64) < step { - // We do not increase the step and watermark here to prevent a too-advanced watermark. The - // original condition is `prev_watermark_time_ms + interval_ms * step > now`. - step = last_step; - watermark_time_ms = physical_now; - } else { - watermark_time_ms += interval_ms as u64 * step; - } - - (step, watermark_time_ms, physical_now) -} - -fn set_lru_watermark_time_ms(watermark_epoch: Arc, time_ms: u64) { - use std::sync::atomic::Ordering; - - let epoch = Epoch::from_physical_time(time_ms).0; - watermark_epoch.as_ref().store(epoch, Ordering::Relaxed); -} diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 48a9be89c952d..63cfc0b2519ce 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -66,10 +66,8 @@ use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; use tower::Layer; -use crate::memory_management::memory_manager::GlobalMemoryManager; -use crate::memory_management::{ - reserve_memory_bytes, storage_memory_config, MIN_COMPUTE_MEMORY_MB, -}; +use crate::memory::config::{reserve_memory_bytes, storage_memory_config, MIN_COMPUTE_MEMORY_MB}; +use crate::memory::manager::MemoryManager; use crate::observer::observer_manager::ComputeObserverNode; use crate::rpc::service::config_service::ConfigServiceImpl; use crate::rpc::service::exchange_metrics::GLOBAL_EXCHANGE_SERVICE_METRICS; @@ -272,10 +270,6 @@ pub async fn compute_node_serve( await_tree_config.clone(), )); - // Spawn LRU Manager that have access to collect memory from batch mgr and stream mgr. - let batch_mgr_clone = batch_mgr.clone(); - let stream_mgr_clone = stream_mgr.clone(); - // NOTE: Due to some limits, we use `compute_memory_bytes + storage_memory_bytes` as // `total_compute_memory_bytes` for memory control. This is just a workaround for some // memory control issues and should be modified as soon as we figure out a better solution. @@ -283,15 +277,13 @@ pub async fn compute_node_serve( // Related issues: // - https://github.com/risingwavelabs/risingwave/issues/8696 // - https://github.com/risingwavelabs/risingwave/issues/8822 - let memory_mgr = GlobalMemoryManager::new( + let memory_mgr = MemoryManager::new( streaming_metrics.clone(), compute_memory_bytes + storage_memory_bytes, ); // Run a background memory manager tokio::spawn(memory_mgr.clone().run( - batch_mgr_clone, - stream_mgr_clone, system_params.barrier_interval_ms(), system_params_manager.watch_params(), )); diff --git a/src/stream/src/cache/managed_lru.rs b/src/stream/src/cache/managed_lru.rs index 0608e429f4bb7..85b4e0081fc0b 100644 --- a/src/stream/src/cache/managed_lru.rs +++ b/src/stream/src/cache/managed_lru.rs @@ -30,11 +30,11 @@ use crate::common::metrics::MetricsInfo; const REPORT_SIZE_EVERY_N_KB_CHANGE: usize = 4096; /// The managed cache is a lru cache that bounds the memory usage by epoch. -/// Should be used with `GlobalMemoryManager`. +/// Should be used with `MemoryManager`. pub struct ManagedLruCache { inner: LruCache, /// The entry with epoch less than water should be evicted. - /// Should only be updated by the `GlobalMemoryManager`. + /// Should only be updated by the `MemoryManager`. watermark_epoch: Arc, /// The heap size of keys/values kv_heap_size: usize,