diff --git a/src/common/src/config.rs b/src/common/src/config.rs index cb01e3dcb575..7a45a197f144 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -684,6 +684,10 @@ pub struct CacheConfig { #[serde(default)] #[config_doc(omitted)] pub meta_cache_eviction: CacheEvictionConfig, + + /// Configure the capacity of the meta cache in MB explicitly for compactor. + #[serde(default)] + pub compactor_meta_cache_capacity_mb: Option, } /// the section `[storage.cache.eviction]` in `risingwave.toml`. @@ -2454,7 +2458,7 @@ pub const MAX_META_CACHE_SHARD_BITS: usize = 4; pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256; pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict. -pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig { +pub fn extract_storage_memory_config_default(s: &RwConfig) -> StorageMemoryConfig { let block_cache_capacity_mb = s.storage.cache.block_cache_capacity_mb.unwrap_or( // adapt to old version s.storage @@ -2486,10 +2490,6 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig { } shard_bits }); - let compactor_memory_limit_mb = s - .storage - .compactor_memory_limit_mb - .unwrap_or(default::storage::compactor_memory_limit_mb()); let get_eviction_config = |c: &CacheEvictionConfig| { match c { @@ -2576,15 +2576,42 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig { meta_cache_capacity_mb, meta_cache_shard_num, shared_buffer_capacity_mb, - compactor_memory_limit_mb, prefetch_buffer_capacity_mb, block_cache_eviction_config, meta_cache_eviction_config, block_file_cache_flush_buffer_threshold_mb, meta_file_cache_flush_buffer_threshold_mb, + + compactor_memory_limit_mb: 0, } } +pub fn extract_compactor_memory_config( + s: &RwConfig, + total_memory_bytes: u64, +) -> StorageMemoryConfig { + let mut default = extract_storage_memory_config_default(s); + + pub const MIN_COMPACTOR_META_CACHE_CAPACITY_MB: usize = 128; + pub const MAX_COMPACTOR_META_CACHE_CAPACITY_PROPORTION: f64 = 0.02; + + default.compactor_memory_limit_mb = + (total_memory_bytes as f64 * s.storage.compactor_memory_available_proportion) as usize; + default.meta_cache_capacity_mb = s + .storage + .cache + .compactor_meta_cache_capacity_mb + .unwrap_or_else(|| { + std::cmp::min( + MIN_COMPACTOR_META_CACHE_CAPACITY_MB, + (total_memory_bytes as f64 * MAX_COMPACTOR_META_CACHE_CAPACITY_PROPORTION) as usize + >> 20, + ) + }); + + default +} + #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)] pub struct CompactionConfig { #[serde(default = "default::compaction_config::max_bytes_for_level_base")] diff --git a/src/compute/src/memory/config.rs b/src/compute/src/memory/config.rs index b74da7d9c481..b3d11a4dce06 100644 --- a/src/compute/src/memory/config.rs +++ b/src/compute/src/memory/config.rs @@ -202,9 +202,14 @@ pub fn storage_memory_config( .unwrap_or(default_block_cache_capacity_mb), ); - let compactor_memory_limit_mb = storage_config.compactor_memory_limit_mb.unwrap_or( - ((non_reserved_memory_bytes as f64 * compactor_memory_proportion).ceil() as usize) >> 20, - ); + let compactor_memory_limit_mb = if embedded_compactor_enabled { + storage_config.compactor_memory_limit_mb.unwrap_or( + ((non_reserved_memory_bytes as f64 * compactor_memory_proportion).ceil() as usize) + >> 20, + ) + } else { + 0 + }; // The file cache flush buffer threshold is used as a emergency limitation. // On most cases the flush buffer is not supposed to be as large as the threshold. @@ -328,7 +333,11 @@ pub fn storage_memory_config( meta_cache_capacity_mb, meta_cache_shard_num, shared_buffer_capacity_mb, - compactor_memory_limit_mb, + compactor_memory_limit_mb: if embedded_compactor_enabled { + compactor_memory_limit_mb + } else { + 0 // unused + }, prefetch_buffer_capacity_mb, block_cache_eviction_config, meta_cache_eviction_config, diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index aae537271de4..441969b8333e 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -235,7 +235,7 @@ pub async fn compute_node_serve( if embedded_compactor_enabled { tracing::info!("start embedded compactor"); let memory_limiter = Arc::new(MemoryLimiter::new( - storage_opts.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2, + storage_memory_config.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2, )); let compaction_executor = Arc::new(CompactionExecutor::new(Some(1))); diff --git a/src/config/docs.md b/src/config/docs.md index 1d373d3b9fda..1982c450df01 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -164,6 +164,7 @@ This page is automatically generated by `./risedev generate-example-config` |--------|-------------|---------| | block_cache_capacity_mb | Configure the capacity of the block cache in MB explicitly. The overridden value will only be effective if: 1. `meta_cache_capacity_mb` and `shared_buffer_capacity_mb` are also configured explicitly. 2. `block_cache_capacity_mb` + `meta_cache_capacity_mb` + `meta_cache_capacity_mb` doesn't exceed 0.3 * non-reserved memory. | | | block_cache_shard_num | Configure the number of shards in the block cache explicitly. If not set, the shard number will be determined automatically based on cache capacity. | | +| compactor_meta_cache_capacity_mb | Configure the capacity of the meta cache in MB explicitly for compactor. | | | meta_cache_capacity_mb | Configure the capacity of the block cache in MB explicitly. The overridden value will only be effective if: 1. `block_cache_capacity_mb` and `shared_buffer_capacity_mb` are also configured explicitly. 2. `block_cache_capacity_mb` + `meta_cache_capacity_mb` + `meta_cache_capacity_mb` doesn't exceed 0.3 * non-reserved memory. | | | meta_cache_shard_num | Configure the number of shards in the meta cache explicitly. If not set, the shard number will be determined automatically based on cache capacity. | | diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index ded76b2bdd94..c293c8a335bb 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use risingwave_common::config::{ - extract_storage_memory_config, load_config, AsyncStackTraceOption, MetricLevel, RwConfig, + extract_compactor_memory_config, load_config, AsyncStackTraceOption, MetricLevel, RwConfig, }; use risingwave_common::monitor::{RouterExt, TcpConfig}; use risingwave_common::system_param::local_manager::LocalSystemParamsManager; @@ -25,7 +25,6 @@ use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsRead use risingwave_common::telemetry::manager::TelemetryManager; use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common::util::addr::HostAddr; -use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use risingwave_common::util::tokio_util::sync::CancellationToken; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_common_heap_profiling::HeapProfiler; @@ -58,6 +57,7 @@ use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl}; use crate::telemetry::CompactorTelemetryCreator; use crate::CompactorOpts; +// Only used for non-embedded compactor pub async fn prepare_start_parameters( compactor_opts: &CompactorOpts, config: RwConfig, @@ -76,24 +76,24 @@ pub async fn prepare_start_parameters( let state_store_url = system_params_reader.state_store(); - let storage_memory_config = extract_storage_memory_config(&config); + let storage_memory_config = extract_compactor_memory_config( + &config, + compactor_opts.compactor_total_memory_bytes as u64, + ); let storage_opts: Arc = Arc::new(StorageOpts::from(( &config, &system_params_reader, &storage_memory_config, ))); - let non_reserved_memory_bytes = (compactor_opts.compactor_total_memory_bytes as f64 - * config.storage.compactor_memory_available_proportion) - as usize; - let meta_cache_capacity_bytes = storage_opts.meta_cache_capacity_mb * (1 << 20); - let compactor_memory_limit_bytes = match config.storage.compactor_memory_limit_mb { - Some(compactor_memory_limit_mb) => compactor_memory_limit_mb as u64 * (1 << 20), - None => (non_reserved_memory_bytes - meta_cache_capacity_bytes) as u64, - }; + let compactor_memory_limit_bytes = + storage_memory_config.compactor_memory_limit_mb as u64 * (1 << 20); + let meta_cache_capacity_bytes = storage_memory_config.meta_cache_capacity_mb * (1 << 20); + let compactor_memory_limit_bytes = + compactor_memory_limit_bytes.saturating_sub(meta_cache_capacity_bytes as u64); tracing::info!( - "Compactor non_reserved_memory_bytes {} meta_cache_capacity_bytes {} compactor_memory_limit_bytes {} sstable_size_bytes {} block_size_bytes {}", - non_reserved_memory_bytes, meta_cache_capacity_bytes, compactor_memory_limit_bytes, + "Compactor compactor_memory_limit_bytes {} meta_cache_capacity_bytes {} compactor_memory_limit_bytes {} sstable_size_bytes {} block_size_bytes {}", + compactor_memory_limit_bytes, meta_cache_capacity_bytes, compactor_memory_limit_bytes, storage_opts.sstable_size_mb * (1 << 20), storage_opts.block_size_kb * (1 << 10), ); @@ -134,7 +134,6 @@ pub async fn prepare_start_parameters( ); let memory_limiter = Arc::new(MemoryLimiter::new(compactor_memory_limit_bytes)); - let storage_memory_config = extract_storage_memory_config(&config); let memory_collector = Arc::new(HummockMemoryCollector::new( sstable_store.clone(), memory_limiter.clone(), @@ -142,7 +141,7 @@ pub async fn prepare_start_parameters( )); let heap_profiler = HeapProfiler::new( - system_memory_available_bytes(), + compactor_opts.compactor_total_memory_bytes, config.server.heap_profiling.clone(), ); diff --git a/src/storage/hummock_test/src/bin/replay/main.rs b/src/storage/hummock_test/src/bin/replay/main.rs index 98ee3ba448ee..75af89ac54cf 100644 --- a/src/storage/hummock_test/src/bin/replay/main.rs +++ b/src/storage/hummock_test/src/bin/replay/main.rs @@ -31,7 +31,7 @@ use clap::Parser; use foyer::{Engine, HybridCacheBuilder}; use replay_impl::{get_replay_notification_client, GlobalReplayImpl}; use risingwave_common::config::{ - extract_storage_memory_config, load_config, NoOverride, ObjectStoreConfig, + extract_storage_memory_config_default, load_config, NoOverride, ObjectStoreConfig, }; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_hummock_trace::{ @@ -89,7 +89,7 @@ async fn run_replay(args: Args) -> Result<()> { async fn create_replay_hummock(r: Record, args: &Args) -> Result { let config = load_config(&args.config, NoOverride); - let storage_memory_config = extract_storage_memory_config(&config); + let storage_memory_config = extract_storage_memory_config_default(&config); let system_params_reader = SystemParamsReader::from(config.system.clone().into_init_system_params()); diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index 98e081e72af1..a976d7c59d8f 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -65,7 +65,6 @@ pub fn default_opts_for_test() -> StorageOpts { block_cache_eviction_config: EvictionConfig::for_test(), disable_remote_compactor: false, share_buffer_upload_concurrency: 1, - compactor_memory_limit_mb: 64, sstable_id_remote_fetch_number: 1, ..Default::default() } diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index e41e07dafc44..892c83c38aa7 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -13,7 +13,8 @@ // limitations under the License. use risingwave_common::config::{ - extract_storage_memory_config, EvictionConfig, ObjectStoreConfig, RwConfig, StorageMemoryConfig, + extract_storage_memory_config_default, EvictionConfig, ObjectStoreConfig, RwConfig, + StorageMemoryConfig, }; use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader}; use risingwave_common::system_param::system_params_for_test; @@ -70,8 +71,6 @@ pub struct StorageOpts { pub disable_remote_compactor: bool, /// Number of tasks shared buffer can upload in parallel. pub share_buffer_upload_concurrency: usize, - /// Capacity of sstable meta cache. - pub compactor_memory_limit_mb: usize, /// compactor streaming iterator recreate timeout. /// deprecated pub compact_iter_recreate_timeout_ms: u64, @@ -151,7 +150,7 @@ impl Default for StorageOpts { fn default() -> Self { let c = RwConfig::default(); let p = system_params_for_test(); - let s = extract_storage_memory_config(&c); + let s = extract_storage_memory_config_default(&c); Self::from((&c, &p.into(), &s)) } } @@ -184,7 +183,6 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt max_prefetch_block_number: c.storage.max_prefetch_block_number, disable_remote_compactor: c.storage.disable_remote_compactor, share_buffer_upload_concurrency: c.storage.share_buffer_upload_concurrency, - compactor_memory_limit_mb: s.compactor_memory_limit_mb, sstable_id_remote_fetch_number: c.storage.sstable_id_remote_fetch_number, min_sst_size_for_streaming_upload: c.storage.min_sst_size_for_streaming_upload, max_concurrent_compaction_task_number: c.storage.max_concurrent_compaction_task_number, diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index cba5565a0906..26886401f95c 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -26,7 +26,7 @@ use clap::Parser; use foyer::CacheHint; use risingwave_common::catalog::TableId; use risingwave_common::config::{ - extract_storage_memory_config, load_config, MetaConfig, NoOverride, + extract_storage_memory_config_default, load_config, MetaConfig, NoOverride, }; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; @@ -357,7 +357,7 @@ async fn start_replay( } // Creates a hummock state store *after* we reset the hummock version - let storage_memory_config = extract_storage_memory_config(&config); + let storage_memory_config = extract_storage_memory_config_default(&config); let storage_opts = Arc::new(StorageOpts::from(( &config, &system_params,