diff --git a/Cargo.lock b/Cargo.lock index ea6dd1975125d..9b6912afb43f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5409,9 +5409,9 @@ dependencies = [ [[package]] name = "foyer" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbfceb28a004d50b6110fd012db1d52d318d15f721dbb5b65eda261b821c1baa" +checksum = "1b5f358e9b9492a9e9af905934ef4736a97a001fa19232f4a29f42974cf7a24c" dependencies = [ "ahash 0.8.11", "anyhow", @@ -5427,9 +5427,9 @@ dependencies = [ [[package]] name = "foyer-common" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ce1b903e1e142eafe8cd3183087fcd7ed6452e5dc8dfb5356a607ec2aa1c869" +checksum = "e109f1fd012cc2f0785db5711c73c388fca2d2ac6e2aabb4eaa3a8af98b4dcdb" dependencies = [ "bytes", "cfg-if", @@ -5449,9 +5449,9 @@ dependencies = [ [[package]] name = "foyer-intrusive" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f7883ac7a8a69115f5bd84072a0b98d4ba72feacddf9040c217a5012bb352ff" +checksum = "2e7ad91ad11f2c94bb4b3b88acd447f3145224415d776f163d2b06a9ed24e63a" dependencies = [ "foyer-common", "itertools 0.13.0", @@ -5459,9 +5459,9 @@ dependencies = [ [[package]] name = "foyer-memory" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fd7b0ce80867803c6b197db20cc1a49fcecc9d5070d2fb829660ec19acf9e72" +checksum = "29273589433f89d61347b196754bfa22357420d1adbf815d354c23512106a194" dependencies = [ "ahash 0.8.11", "bitflags 2.5.0", @@ -5482,9 +5482,9 @@ dependencies = [ [[package]] name = "foyer-storage" -version = "0.9.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4205359a7ea9d05ff7525f0e55a5257a593c89b4e22ebfa07e06a84c2e163202" +checksum = "79c20580e456ea337e4bdefa9fba7ff19b42cd87432bda3ef3579ef9e1057c7e" dependencies = [ "ahash 0.8.11", "allocator-api2", diff --git a/Cargo.toml b/Cargo.toml index 80f3f5af2e8c7..39a600e374d78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ license = "Apache-2.0" repository = "https://github.com/risingwavelabs/risingwave" [workspace.dependencies] -foyer = { version = "0.10.0", features = ["nightly", "mtrace"] } +foyer = { version = "0.10.1", features = ["nightly", "mtrace"] } apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [ "snappy", "zstandard", diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 47d53a75b76a1..4bdbf72cf273a 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -851,6 +851,9 @@ pub struct FileCacheConfig { #[serde(default = "default::file_cache::compression")] pub compression: String, + #[serde(default = "default::file_cache::flush_buffer_threshold_mb")] + pub flush_buffer_threshold_mb: Option, + #[serde(default, flatten)] #[config_doc(omitted)] pub unrecognized: Unrecognized, @@ -1591,6 +1594,14 @@ pub mod default { pub fn table_info_statistic_history_times() -> usize { 240 } + + pub fn block_file_cache_flush_buffer_threshold_mb() -> usize { + 256 + } + + pub fn meta_file_cache_flush_buffer_threshold_mb() -> usize { + 64 + } } pub mod streaming { @@ -1651,6 +1662,10 @@ pub mod default { pub fn compression() -> String { "none".to_string() } + + pub fn flush_buffer_threshold_mb() -> Option { + None + } } pub mod cache_refill { @@ -2123,6 +2138,8 @@ pub struct StorageMemoryConfig { pub prefetch_buffer_capacity_mb: usize, pub block_cache_eviction_config: EvictionConfig, pub meta_cache_eviction_config: EvictionConfig, + pub block_file_cache_flush_buffer_threshold_mb: usize, + pub meta_file_cache_flush_buffer_threshold_mb: usize, } pub const MAX_META_CACHE_SHARD_BITS: usize = 4; @@ -2234,6 +2251,17 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig { } }); + let block_file_cache_flush_buffer_threshold_mb = s + .storage + .data_file_cache + .flush_buffer_threshold_mb + .unwrap_or(default::storage::block_file_cache_flush_buffer_threshold_mb()); + let meta_file_cache_flush_buffer_threshold_mb = s + .storage + .meta_file_cache + .flush_buffer_threshold_mb + .unwrap_or(default::storage::block_file_cache_flush_buffer_threshold_mb()); + StorageMemoryConfig { block_cache_capacity_mb, block_cache_shard_num, @@ -2244,6 +2272,8 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig { prefetch_buffer_capacity_mb, block_cache_eviction_config, meta_cache_eviction_config, + block_file_cache_flush_buffer_threshold_mb, + meta_file_cache_flush_buffer_threshold_mb, } } diff --git a/src/compute/src/memory/config.rs b/src/compute/src/memory/config.rs index 2a6607c268a67..234490773f40f 100644 --- a/src/compute/src/memory/config.rs +++ b/src/compute/src/memory/config.rs @@ -168,6 +168,24 @@ pub fn storage_memory_config( ((non_reserved_memory_bytes as f64 * compactor_memory_proportion).ceil() as usize) >> 20, ); + // The file cache flush buffer threshold is used as a emergency limitation. + // On most cases the flush buffer is not supposed to be as large as the threshold. + // So, the file cache flush buffer threshold size is not calculated in the memory usage. + let block_file_cache_flush_buffer_threshold_mb = storage_config + .data_file_cache + .flush_buffer_threshold_mb + .unwrap_or( + risingwave_common::config::default::storage::block_file_cache_flush_buffer_threshold_mb( + ), + ); + let meta_file_cache_flush_buffer_threshold_mb = storage_config + .meta_file_cache + .flush_buffer_threshold_mb + .unwrap_or( + risingwave_common::config::default::storage::meta_file_cache_flush_buffer_threshold_mb( + ), + ); + let total_calculated_mb = block_cache_capacity_mb + meta_cache_capacity_mb + shared_buffer_capacity_mb @@ -276,6 +294,8 @@ pub fn storage_memory_config( prefetch_buffer_capacity_mb, block_cache_eviction_config, meta_cache_eviction_config, + block_file_cache_flush_buffer_threshold_mb, + meta_file_cache_flush_buffer_threshold_mb, } } diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index 143b6bba37981..4c8d8ae8e2bb9 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -90,6 +90,7 @@ pub struct StorageOpts { pub data_file_cache_insert_rate_limit_mb: usize, pub data_file_cache_indexer_shards: usize, pub data_file_cache_compression: String, + pub data_file_cache_flush_buffer_threshold_mb: usize, pub cache_refill_data_refill_levels: Vec, pub cache_refill_timeout_ms: u64, @@ -108,6 +109,7 @@ pub struct StorageOpts { pub meta_file_cache_insert_rate_limit_mb: usize, pub meta_file_cache_indexer_shards: usize, pub meta_file_cache_compression: String, + pub meta_file_cache_flush_buffer_threshold_mb: usize, /// The storage url for storing backups. pub backup_storage_url: String, @@ -183,6 +185,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt data_file_cache_insert_rate_limit_mb: c.storage.data_file_cache.insert_rate_limit_mb, data_file_cache_indexer_shards: c.storage.data_file_cache.indexer_shards, data_file_cache_compression: c.storage.data_file_cache.compression.clone(), + data_file_cache_flush_buffer_threshold_mb: s.block_file_cache_flush_buffer_threshold_mb, meta_file_cache_dir: c.storage.meta_file_cache.dir.clone(), meta_file_cache_capacity_mb: c.storage.meta_file_cache.capacity_mb, meta_file_cache_file_capacity_mb: c.storage.meta_file_cache.file_capacity_mb, @@ -192,6 +195,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt meta_file_cache_insert_rate_limit_mb: c.storage.meta_file_cache.insert_rate_limit_mb, meta_file_cache_indexer_shards: c.storage.meta_file_cache.indexer_shards, meta_file_cache_compression: c.storage.meta_file_cache.compression.clone(), + meta_file_cache_flush_buffer_threshold_mb: s.meta_file_cache_flush_buffer_threshold_mb, cache_refill_data_refill_levels: c.storage.cache_refill.data_refill_levels.clone(), cache_refill_timeout_ms: c.storage.cache_refill.timeout_ms, cache_refill_concurrency: c.storage.cache_refill.concurrency, diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index dc1eb34be62b7..8dd718324d9a5 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -281,7 +281,7 @@ pub mod verify { // TODO: may avoid manual async fn when the bug of rust compiler is fixed. Currently it will // fail to compile. - #[allow(clippy::manual_async_fn)] + #[expect(clippy::manual_async_fn)] fn iter( &self, key_range: TableKeyRange, @@ -303,7 +303,7 @@ pub mod verify { } } - #[allow(clippy::manual_async_fn)] + #[expect(clippy::manual_async_fn)] fn rev_iter( &self, key_range: TableKeyRange, @@ -421,7 +421,7 @@ pub mod verify { actual } - #[allow(clippy::manual_async_fn)] + #[expect(clippy::manual_async_fn)] fn iter( &self, key_range: TableKeyRange, @@ -442,7 +442,7 @@ pub mod verify { } } - #[allow(clippy::manual_async_fn)] + #[expect(clippy::manual_async_fn)] fn rev_iter( &self, key_range: TableKeyRange, @@ -644,6 +644,7 @@ impl StateStoreImpl { .with_indexer_shards(opts.meta_file_cache_indexer_shards) .with_flushers(opts.meta_file_cache_flushers) .with_reclaimers(opts.meta_file_cache_reclaimers) + .with_buffer_threshold(opts.meta_file_cache_flush_buffer_threshold_mb * MB) // 128 MiB .with_clean_region_threshold( opts.meta_file_cache_reclaimers + opts.meta_file_cache_reclaimers / 2, ) @@ -696,6 +697,7 @@ impl StateStoreImpl { .with_indexer_shards(opts.data_file_cache_indexer_shards) .with_flushers(opts.data_file_cache_flushers) .with_reclaimers(opts.data_file_cache_reclaimers) + .with_buffer_threshold(opts.data_file_cache_flush_buffer_threshold_mb * MB) // 128 MiB .with_clean_region_threshold( opts.data_file_cache_reclaimers + opts.data_file_cache_reclaimers / 2, )