Skip to content

Commit

Permalink
refactor(memory): make args of memory controller configurable (#15710)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Mar 15, 2024
1 parent 7dbd0a4 commit 1d6536c
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 29 deletions.
19 changes: 19 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,15 @@ pub struct StreamingDeveloperConfig {
/// The max heap size of dirty groups of `HashAggExecutor`.
#[serde(default = "default::developer::stream_hash_agg_max_dirty_groups_heap_size")]
pub hash_agg_max_dirty_groups_heap_size: usize,

#[serde(default = "default::developer::memory_controller_threshold_aggressive")]
pub memory_controller_threshold_aggressive: f64,

#[serde(default = "default::developer::memory_controller_threshold_graceful")]
pub memory_controller_threshold_graceful: f64,

#[serde(default = "default::developer::memory_controller_threshold_stable")]
pub memory_controller_threshold_stable: f64,
}

/// The subsections `[batch.developer]`.
Expand Down Expand Up @@ -1497,6 +1506,16 @@ pub mod default {
pub fn enable_check_task_level_overlap() -> bool {
false
}

pub fn memory_controller_threshold_aggressive() -> f64 {
0.9
}
pub fn memory_controller_threshold_graceful() -> f64 {
0.8
}
pub fn memory_controller_threshold_stable() -> f64 {
0.7
}
}

pub use crate::system_param::default as system;
Expand Down
52 changes: 34 additions & 18 deletions src/compute/src/memory/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use risingwave_common::util::epoch::Epoch;
use risingwave_jni_core::jvm_runtime::load_jvm_memory_stats;
use risingwave_stream::executor::monitor::StreamingMetrics;

use super::manager::MemoryManagerConfig;

/// Internal state of [`LruWatermarkController`] that saves the state in previous tick.
struct State {
pub used_memory_bytes: usize,
Expand All @@ -36,6 +38,19 @@ impl Default for State {
}
}

/// - `allocated`: Total number of bytes allocated by the application.
/// - `active`: Total number of bytes in active pages allocated by the application. This is a multiple of the page size, and greater than or equal to `stats.allocated`. This does not include `stats.arenas.<i>.pdirty`, `stats.arenas.<i>.pmuzzy`, nor pages entirely devoted to allocator metadata.
/// - `resident`: Total number of bytes in physically resident data pages mapped by the allocator.
/// - `metadata`: Total number of bytes dedicated to jemalloc metadata.
///
/// Reference: <https://jemalloc.net/jemalloc.3.html>
pub struct MemoryStats {
pub allocated: usize,
pub active: usize,
pub resident: usize,
pub metadata: usize,
}

/// `LruWatermarkController` controls LRU Watermark (epoch) according to actual memory usage statistics
/// collected from Jemalloc and JVM.
///
Expand Down Expand Up @@ -65,18 +80,14 @@ pub struct LruWatermarkController {
}

impl LruWatermarkController {
// TODO(eric): make them configurable
const THRESHOLD_AGGRESSIVE: f64 = 0.9;
const THRESHOLD_GRACEFUL: f64 = 0.8;
const THRESHOLD_STABLE: f64 = 0.7;

pub fn new(total_memory: usize, metrics: Arc<StreamingMetrics>) -> Self {
let threshold_stable = (total_memory as f64 * Self::THRESHOLD_STABLE) as usize;
let threshold_graceful = (total_memory as f64 * Self::THRESHOLD_GRACEFUL) as usize;
let threshold_aggressive = (total_memory as f64 * Self::THRESHOLD_AGGRESSIVE) as usize;
pub fn new(config: &MemoryManagerConfig) -> Self {
let threshold_stable = (config.total_memory as f64 * config.threshold_stable) as usize;
let threshold_graceful = (config.total_memory as f64 * config.threshold_graceful) as usize;
let threshold_aggressive =
(config.total_memory as f64 * config.threshold_aggressive) as usize;

Self {
metrics,
metrics: config.metrics.clone(),
threshold_stable,
threshold_graceful,
threshold_aggressive,
Expand All @@ -103,26 +114,31 @@ impl std::fmt::Debug for LruWatermarkController {
/// - `stats.metadata`: Total number of bytes dedicated to jemalloc metadata.
///
/// Reference: <https://jemalloc.net/jemalloc.3.html>
fn jemalloc_memory_stats() -> (usize, usize, usize, usize) {
fn jemalloc_memory_stats() -> MemoryStats {
if let Err(e) = tikv_jemalloc_ctl::epoch::advance() {
tracing::warn!("Jemalloc epoch advance failed! {:?}", e);
}
let allocated = tikv_jemalloc_ctl::stats::allocated::read().unwrap();
let active = tikv_jemalloc_ctl::stats::active::read().unwrap();
let resident = tikv_jemalloc_ctl::stats::resident::read().unwrap();
let metadata = tikv_jemalloc_ctl::stats::metadata::read().unwrap();
(allocated, active, resident, metadata)
MemoryStats {
allocated,
active,
resident,
metadata,
}
}

impl LruWatermarkController {
pub fn tick(&mut self, interval_ms: u32) -> Epoch {
// NOTE: Be careful! The meaning of `allocated` and `active` differ in JeMalloc and JVM
let (
jemalloc_allocated_bytes,
jemalloc_active_bytes,
jemalloc_resident_bytes,
jemalloc_metadata_bytes,
) = jemalloc_memory_stats();
let MemoryStats {
allocated: jemalloc_allocated_bytes,
active: jemalloc_active_bytes,
resident: jemalloc_resident_bytes,
metadata: jemalloc_metadata_bytes,
} = jemalloc_memory_stats();
let (jvm_allocated_bytes, jvm_active_bytes) = load_jvm_memory_stats();

let cur_used_memory_bytes = jemalloc_active_bytes + jvm_allocated_bytes;
Expand Down
19 changes: 13 additions & 6 deletions src/compute/src/memory/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ use risingwave_stream::executor::monitor::StreamingMetrics;

use super::controller::LruWatermarkController;

pub struct MemoryManagerConfig {
pub total_memory: usize,

pub threshold_aggressive: f64,
pub threshold_graceful: f64,
pub threshold_stable: f64,

pub metrics: Arc<StreamingMetrics>,
}

/// Compute node uses [`MemoryManager`] to limit the memory usage.
pub struct MemoryManager {
/// All cached data before the watermark should be evicted.
Expand All @@ -37,16 +47,13 @@ impl MemoryManager {
// especially when it's 0.
const MIN_TICK_INTERVAL_MS: u32 = 10;

pub fn new(metrics: Arc<StreamingMetrics>, total_memory_bytes: usize) -> Arc<Self> {
let controller = Mutex::new(LruWatermarkController::new(
total_memory_bytes,
metrics.clone(),
));
pub fn new(config: MemoryManagerConfig) -> Arc<Self> {
let controller = Mutex::new(LruWatermarkController::new(&config));
tracing::info!("LRU watermark controller: {:?}", &controller);

Arc::new(Self {
watermark_epoch: Arc::new(0.into()),
metrics,
metrics: config.metrics,
controller,
})
}
Expand Down
22 changes: 17 additions & 5 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use tokio::task::JoinHandle;
use tower::Layer;

use crate::memory::config::{reserve_memory_bytes, storage_memory_config, MIN_COMPUTE_MEMORY_MB};
use crate::memory::manager::MemoryManager;
use crate::memory::manager::{MemoryManager, MemoryManagerConfig};
use crate::observer::observer_manager::ComputeObserverNode;
use crate::rpc::service::config_service::ConfigServiceImpl;
use crate::rpc::service::exchange_metrics::GLOBAL_EXCHANGE_SERVICE_METRICS;
Expand Down Expand Up @@ -280,10 +280,22 @@ pub async fn compute_node_serve(
// Related issues:
// - https://github.com/risingwavelabs/risingwave/issues/8696
// - https://github.com/risingwavelabs/risingwave/issues/8822
let memory_mgr = MemoryManager::new(
streaming_metrics.clone(),
compute_memory_bytes + storage_memory_bytes,
);
let memory_mgr = MemoryManager::new(MemoryManagerConfig {
total_memory: compute_memory_bytes + storage_memory_bytes,
threshold_aggressive: config
.streaming
.developer
.memory_controller_threshold_aggressive,
threshold_graceful: config
.streaming
.developer
.memory_controller_threshold_graceful,
threshold_stable: config
.streaming
.developer
.memory_controller_threshold_stable,
metrics: streaming_metrics.clone(),
});

// Run a background memory manager
tokio::spawn(memory_mgr.clone().run(
Expand Down
3 changes: 3 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ stream_exchange_concurrent_barriers = 1
stream_exchange_concurrent_dispatchers = 0
stream_dml_channel_initial_permits = 32768
stream_hash_agg_max_dirty_groups_heap_size = 67108864
stream_memory_controller_threshold_aggressive = 0.9
stream_memory_controller_threshold_graceful = 0.8
stream_memory_controller_threshold_stable = 0.7

[storage]
share_buffers_sync_parallelism = 1
Expand Down

0 comments on commit 1d6536c

Please sign in to comment.