diff --git a/src/common/src/cache.rs b/src/common/src/cache.rs index e1e97891c1304..99a373d6a94a8 100644 --- a/src/common/src/cache.rs +++ b/src/common/src/cache.rs @@ -665,31 +665,25 @@ pub struct LruCache { const DEFAULT_OBJECT_POOL_SIZE: usize = 1024; impl LruCache { - pub fn new(num_shard_bits: usize, capacity: usize, high_priority_ratio: usize) -> Self { - Self::new_inner(num_shard_bits, capacity, high_priority_ratio, None) + pub fn new(num_shards: usize, capacity: usize, high_priority_ratio: usize) -> Self { + Self::new_inner(num_shards, capacity, high_priority_ratio, None) } pub fn with_event_listener( - num_shard_bits: usize, + num_shards: usize, capacity: usize, high_priority_ratio: usize, listener: Arc>, ) -> Self { - Self::new_inner( - num_shard_bits, - capacity, - high_priority_ratio, - Some(listener), - ) + Self::new_inner(num_shards, capacity, high_priority_ratio, Some(listener)) } fn new_inner( - num_shard_bits: usize, + num_shards: usize, capacity: usize, high_priority_ratio: usize, listener: Option>>, ) -> Self { - let num_shards = 1 << num_shard_bits; let mut shards = Vec::with_capacity(num_shards); let per_shard = capacity / num_shards; let mut shard_usages = Vec::with_capacity(num_shards); @@ -1055,7 +1049,7 @@ mod tests { #[test] fn test_cache_shard() { - let cache = Arc::new(LruCache::<(u64, u64), Block>::new(2, 256, 0)); + let cache = Arc::new(LruCache::<(u64, u64), Block>::new(4, 256, 0)); assert_eq!(cache.shard(0), 0); assert_eq!(cache.shard(1), 1); assert_eq!(cache.shard(10), 2); @@ -1355,7 +1349,7 @@ mod tests { #[test] fn test_write_request_pending() { - let cache = Arc::new(LruCache::new(0, 5, 0)); + let cache = Arc::new(LruCache::new(1, 5, 0)); { let mut shard = cache.shards[0].lock(); insert(&mut shard, "a", "v1"); @@ -1400,7 +1394,7 @@ mod tests { #[test] fn test_event_listener() { let listener = Arc::new(TestLruCacheEventListener::default()); - let cache = Arc::new(LruCache::with_event_listener(0, 2, 0, listener.clone())); + let cache = Arc::new(LruCache::with_event_listener(1, 2, 0, listener.clone())); // full-fill cache let h = cache.insert( @@ -1495,7 +1489,7 @@ mod tests { #[tokio::test] async fn test_future_cancel() { - let cache: Arc> = Arc::new(LruCache::new(0, 5, 0)); + let cache: Arc> = Arc::new(LruCache::new(1, 5, 0)); // do not need sender because this receiver will be cancelled. let (_, recv) = channel::<()>(); let polled = Arc::new(AtomicBool::new(false)); diff --git a/src/common/src/config.rs b/src/common/src/config.rs index eb9c421c84aa0..3471875409dd9 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -601,6 +601,9 @@ pub struct StorageConfig { #[serde(default)] pub block_cache_capacity_mb: Option, + #[serde(default)] + pub block_shard_num: Option, + #[serde(default)] pub high_priority_ratio_in_percent: Option, @@ -608,6 +611,9 @@ pub struct StorageConfig { #[serde(default)] pub meta_cache_capacity_mb: Option, + #[serde(default)] + pub meta_shard_num: Option, + /// max memory usage for large query #[serde(default)] pub prefetch_buffer_capacity_mb: Option, @@ -1656,7 +1662,9 @@ pub mod default { pub struct StorageMemoryConfig { pub block_cache_capacity_mb: usize, + pub block_shard_num: usize, pub meta_cache_capacity_mb: usize, + pub meta_shard_num: usize, pub shared_buffer_capacity_mb: usize, pub data_file_cache_ring_buffer_capacity_mb: usize, pub meta_file_cache_ring_buffer_capacity_mb: usize, @@ -1665,6 +1673,10 @@ pub struct StorageMemoryConfig { pub high_priority_ratio_in_percent: usize, } +pub const MAX_META_CACHE_SHARD_BITS: usize = 4; +pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256; +pub const MAX_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict. + pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig { let block_cache_capacity_mb = s .storage @@ -1678,6 +1690,21 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig { .storage .shared_buffer_capacity_mb .unwrap_or(default::storage::shared_buffer_capacity_mb()); + let meta_shard_num = s.storage.meta_shard_num.unwrap_or_else(|| { + let mut shard_bits = MAX_META_CACHE_SHARD_BITS; + while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 { + shard_bits -= 1; + } + shard_bits + }); + let block_shard_num = s.storage.block_shard_num.unwrap_or_else(|| { + let mut shard_bits = MAX_CACHE_SHARD_BITS; + while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 + { + shard_bits -= 1; + } + shard_bits + }); let data_file_cache_ring_buffer_capacity_mb = s.storage.data_file_cache.ring_buffer_capacity_mb; let meta_file_cache_ring_buffer_capacity_mb = s.storage.meta_file_cache.ring_buffer_capacity_mb; let compactor_memory_limit_mb = s @@ -1695,7 +1722,9 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig { StorageMemoryConfig { block_cache_capacity_mb, + block_shard_num, meta_cache_capacity_mb, + meta_shard_num, shared_buffer_capacity_mb, data_file_cache_ring_buffer_capacity_mb, meta_file_cache_ring_buffer_capacity_mb, diff --git a/src/compute/src/memory/config.rs b/src/compute/src/memory/config.rs index 5a21e1af79725..a6eee7c2416d3 100644 --- a/src/compute/src/memory/config.rs +++ b/src/compute/src/memory/config.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::config::{StorageConfig, StorageMemoryConfig}; +use risingwave_common::config::{ + StorageConfig, StorageMemoryConfig, MAX_CACHE_SHARD_BITS, MAX_META_CACHE_SHARD_BITS, + MIN_BUFFER_SIZE_PER_SHARD, +}; use risingwave_common::util::pretty_bytes::convert; /// The minimal memory requirement of computing tasks in megabytes. @@ -30,6 +33,7 @@ const COMPACTOR_MEMORY_PROPORTION: f64 = 0.1; const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3; const STORAGE_META_CACHE_MAX_MEMORY_MB: usize = 4096; +const STORAGE_SHARED_BUFFER_MAX_MEMORY_MB: usize = 4096; const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35; const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3; const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 50; @@ -65,41 +69,56 @@ pub fn storage_memory_config( } else { (STORAGE_MEMORY_PROPORTION + COMPACTOR_MEMORY_PROPORTION, 0.0) }; - let mut block_cache_capacity_mb = storage_config.block_cache_capacity_mb.unwrap_or( - ((non_reserved_memory_bytes as f64 - * storage_memory_proportion - * STORAGE_BLOCK_CACHE_MEMORY_PROPORTION) - .ceil() as usize) - >> 20, - ); + let mut default_block_cache_capacity_mb = ((non_reserved_memory_bytes as f64 + * storage_memory_proportion + * STORAGE_BLOCK_CACHE_MEMORY_PROPORTION) + .ceil() as usize) + >> 20; let high_priority_ratio_in_percent = storage_config .high_priority_ratio_in_percent .unwrap_or(STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO); - let default_meta_cache_capacity = (non_reserved_memory_bytes as f64 + let default_meta_cache_capacity_mb = ((non_reserved_memory_bytes as f64 * storage_memory_proportion * STORAGE_META_CACHE_MEMORY_PROPORTION) - .ceil() as usize; + .ceil() as usize) + >> 20; let meta_cache_capacity_mb = storage_config .meta_cache_capacity_mb .unwrap_or(std::cmp::min( - default_meta_cache_capacity >> 20, + default_meta_cache_capacity_mb, STORAGE_META_CACHE_MAX_MEMORY_MB, )); let prefetch_buffer_capacity_mb = storage_config .prefetch_buffer_capacity_mb - .unwrap_or(block_cache_capacity_mb); + .unwrap_or(default_block_cache_capacity_mb); - if meta_cache_capacity_mb == STORAGE_META_CACHE_MAX_MEMORY_MB { - block_cache_capacity_mb += (default_meta_cache_capacity >> 20) - meta_cache_capacity_mb; + if meta_cache_capacity_mb != default_meta_cache_capacity_mb { + default_block_cache_capacity_mb += default_meta_cache_capacity_mb; + default_block_cache_capacity_mb = + default_block_cache_capacity_mb.saturating_sub(meta_cache_capacity_mb); } - let shared_buffer_capacity_mb = storage_config.shared_buffer_capacity_mb.unwrap_or( - ((non_reserved_memory_bytes as f64 - * storage_memory_proportion - * STORAGE_SHARED_BUFFER_MEMORY_PROPORTION) - .ceil() as usize) - >> 20, - ); + + let default_shared_buffer_capacity_mb = ((non_reserved_memory_bytes as f64 + * storage_memory_proportion + * STORAGE_SHARED_BUFFER_MEMORY_PROPORTION) + .ceil() as usize) + >> 20; + let shared_buffer_capacity_mb = + storage_config + .shared_buffer_capacity_mb + .unwrap_or(std::cmp::min( + default_shared_buffer_capacity_mb, + STORAGE_SHARED_BUFFER_MAX_MEMORY_MB, + )); + if shared_buffer_capacity_mb != default_shared_buffer_capacity_mb { + default_block_cache_capacity_mb += default_shared_buffer_capacity_mb; + default_block_cache_capacity_mb = + default_block_cache_capacity_mb.saturating_sub(shared_buffer_capacity_mb); + } + let block_cache_capacity_mb = storage_config + .block_cache_capacity_mb + .unwrap_or(default_block_cache_capacity_mb); let data_file_cache_ring_buffer_capacity_mb = if storage_config.data_file_cache.dir.is_empty() { 0 @@ -135,9 +154,27 @@ pub fn storage_memory_config( ); } + let meta_shard_num = storage_config.meta_shard_num.unwrap_or_else(|| { + let mut shard_bits = MAX_META_CACHE_SHARD_BITS; + while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 { + shard_bits -= 1; + } + 1 << shard_bits + }); + let block_shard_num = storage_config.block_shard_num.unwrap_or_else(|| { + let mut shard_bits = MAX_CACHE_SHARD_BITS; + while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 + { + shard_bits -= 1; + } + 1 << shard_bits + }); + StorageMemoryConfig { block_cache_capacity_mb, + block_shard_num, meta_cache_capacity_mb, + meta_shard_num, shared_buffer_capacity_mb, data_file_cache_ring_buffer_capacity_mb, meta_file_cache_ring_buffer_capacity_mb, diff --git a/src/config/docs.md b/src/config/docs.md index 452caa81e83fc..3730b4e828f6d 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -94,6 +94,7 @@ This page is automatically generated by `./risedev generate-example-config` | Config | Description | Default | |--------|-------------|---------| | block_cache_capacity_mb | Capacity of sstable block cache. | | +| block_shard_num | | | | cache_refill | | | | check_compaction_result | | false | | compact_iter_recreate_timeout_ms | | 600000 | @@ -118,6 +119,7 @@ This page is automatically generated by `./risedev generate-example-config` | mem_table_spill_threshold | The spill threshold for mem table. | 4194304 | | meta_cache_capacity_mb | Capacity of sstable meta cache. | | | meta_file_cache | | | +| meta_shard_num | | | | min_sst_size_for_streaming_upload | Whether to enable streaming upload for sstable. | 33554432 | | object_store | | | | prefetch_buffer_capacity_mb | max memory usage for large query | | diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index b88dd265d382b..f30c36eb4a2f0 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -176,6 +176,8 @@ impl HummockServiceOpts { state_store_metrics: Arc::new(global_hummock_state_store_metrics( MetricLevel::Disabled, )), + block_shard_num: opts.block_shard_num, + meta_shard_num: opts.meta_shard_num, }))) } } diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index 69845ff0f459e..538711ecea7e6 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -87,6 +87,8 @@ impl HummockJavaBindingIterator { max_prefetch_block_number: 16, data_file_cache: FileCache::none(), meta_file_cache: FileCache::none(), + block_shard_num: 2, + meta_shard_num: 2, recent_filter: None, state_store_metrics: Arc::new(global_hummock_state_store_metrics( MetricLevel::Disabled, diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index edab0d067778d..9d960572b8d0f 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -65,6 +65,8 @@ pub fn mock_sstable_store() -> SstableStoreRef { block_cache_capacity: 64 << 20, meta_cache_capacity: 128 << 20, high_priority_ratio: 0, + meta_shard_num: 2, + block_shard_num: 2, prefetch_buffer_capacity: 64 << 20, max_prefetch_block_number: 16, data_file_cache: FileCache::none(), diff --git a/src/storage/benches/bench_multi_builder.rs b/src/storage/benches/bench_multi_builder.rs index bdc288b4d6925..590fb1a6f497e 100644 --- a/src/storage/benches/bench_multi_builder.rs +++ b/src/storage/benches/bench_multi_builder.rs @@ -147,6 +147,8 @@ fn bench_builder( block_cache_capacity: 64 << 20, meta_cache_capacity: 128 << 20, high_priority_ratio: 0, + meta_shard_num: 2, + block_shard_num: 2, prefetch_buffer_capacity: 64 << 20, max_prefetch_block_number: 16, data_file_cache: FileCache::none(), diff --git a/src/storage/hummock_test/src/bin/replay/main.rs b/src/storage/hummock_test/src/bin/replay/main.rs index 276a72bb26592..99608e1b655e9 100644 --- a/src/storage/hummock_test/src/bin/replay/main.rs +++ b/src/storage/hummock_test/src/bin/replay/main.rs @@ -122,6 +122,8 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result, BlockCacheEventListener>; -const MIN_BUFFER_SIZE_PER_SHARD: usize = 256 * 1024 * 1024; - enum BlockEntry { Cache(#[allow(dead_code)] CachedBlockEntry), Owned(#[allow(dead_code)] Box), @@ -113,25 +111,21 @@ impl BlockCache { // TODO(MrCroxx): support other cache algorithm pub fn new( capacity: usize, - mut max_shard_bits: usize, + block_shard_num: usize, high_priority_ratio: usize, event_listener: BlockCacheEventListener, ) -> Self { if capacity == 0 { panic!("block cache capacity == 0"); } - while (capacity >> max_shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && max_shard_bits > 0 { - max_shard_bits -= 1; - } - let shards = 1 << max_shard_bits; let cache = Cache::lru(LruCacheConfig { capacity, - shards, + shards: block_shard_num, eviction_config: LruConfig { high_priority_pool_ratio: high_priority_ratio as f64 / 100.0, }, - object_pool_capacity: shards * 1024, + object_pool_capacity: block_shard_num * 1024, hash_builder: RandomState::default(), event_listener, }); diff --git a/src/storage/src/hummock/iterator/test_utils.rs b/src/storage/src/hummock/iterator/test_utils.rs index 66ce92265dea5..8a2fa2cf8ab70 100644 --- a/src/storage/src/hummock/iterator/test_utils.rs +++ b/src/storage/src/hummock/iterator/test_utils.rs @@ -73,6 +73,8 @@ pub fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableSto high_priority_ratio: 0, prefetch_buffer_capacity: 64 << 20, max_prefetch_block_number: 16, + meta_shard_num: 2, + block_shard_num: 2, data_file_cache: FileCache::none(), meta_file_cache: FileCache::none(), recent_filter: None, diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 2640321f552b3..c272dda0d886e 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -53,10 +53,6 @@ use crate::hummock::multi_builder::UploadJoinHandle; use crate::hummock::{BlockHolder, HummockError, HummockResult, MemoryLimiter}; use crate::monitor::{HummockStateStoreMetrics, MemoryCollector, StoreLocalStatistic}; -const MAX_META_CACHE_SHARD_BITS: usize = 2; -const MAX_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict. -const MIN_BUFFER_SIZE_PER_SHARD: usize = 256 * 1024 * 1024; // 256MB - pub type TableHolder = CacheEntry, MetaCacheEventListener>; // TODO: Define policy based on use cases (read / compaction / ...). @@ -196,6 +192,8 @@ pub struct SstableStoreConfig { pub block_cache_capacity: usize, pub meta_cache_capacity: usize, pub high_priority_ratio: usize, + pub meta_shard_num: usize, + pub block_shard_num: usize, pub prefetch_buffer_capacity: usize, pub max_prefetch_block_number: usize, pub data_file_cache: FileCache, @@ -226,16 +224,10 @@ impl SstableStore { pub fn new(config: SstableStoreConfig) -> Self { // TODO: We should validate path early. Otherwise object store won't report invalid path // error until first write attempt. - let mut meta_cache_shard_bits = MAX_META_CACHE_SHARD_BITS; - while (config.meta_cache_capacity >> meta_cache_shard_bits) < MIN_BUFFER_SIZE_PER_SHARD - && meta_cache_shard_bits > 0 - { - meta_cache_shard_bits -= 1; - } let block_cache = BlockCache::new( config.block_cache_capacity, - MAX_CACHE_SHARD_BITS, + config.block_shard_num, config.high_priority_ratio, BlockCacheEventListener::new( config.data_file_cache.clone(), @@ -245,11 +237,11 @@ impl SstableStore { // TODO(MrCroxx): support other cache algorithm let meta_cache = Arc::new(Cache::lru(LruCacheConfig { capacity: config.meta_cache_capacity, - shards: 1 << meta_cache_shard_bits, + shards: config.meta_shard_num, eviction_config: LruConfig { high_priority_pool_ratio: 0.0, }, - object_pool_capacity: (1 << meta_cache_shard_bits) * 1024, + object_pool_capacity: config.meta_shard_num * 1024, hash_builder: RandomState::default(), event_listener: MetaCacheEventListener::from(config.meta_file_cache.clone()), })); @@ -293,7 +285,7 @@ impl SstableStore { store, block_cache: BlockCache::new( block_cache_capacity, - 0, + 1, 0, BlockCacheEventListener::new( FileCache::none(), diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index 60a787f46c481..9febded2d52db 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -47,8 +47,12 @@ pub struct StorageOpts { pub write_conflict_detection_enabled: bool, /// Capacity of sstable block cache. pub block_cache_capacity_mb: usize, + /// the number of block-cache shard. Less shard means that more concurrent-conflict. + pub block_shard_num: usize, /// Capacity of sstable meta cache. pub meta_cache_capacity_mb: usize, + /// the number of meta-cache shard. Less shard means that more concurrent-conflict. + pub meta_shard_num: usize, /// Percent of the ratio of high priority data in block-cache pub high_priority_ratio: usize, /// max memory usage for large query. @@ -170,6 +174,8 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt prefetch_buffer_capacity_mb: s.prefetch_buffer_capacity_mb, max_prefetch_block_number: c.storage.max_prefetch_block_number, meta_cache_capacity_mb: s.meta_cache_capacity_mb, + block_shard_num: s.block_shard_num, + meta_shard_num: s.meta_shard_num, disable_remote_compactor: c.storage.disable_remote_compactor, share_buffer_upload_concurrency: c.storage.share_buffer_upload_concurrency, compactor_memory_limit_mb: s.compactor_memory_limit_mb, diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 5d57a8c4ba955..c0bbe32b050f6 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -616,6 +616,8 @@ impl StateStoreImpl { block_cache_capacity: opts.block_cache_capacity_mb * (1 << 20), meta_cache_capacity: opts.meta_cache_capacity_mb * (1 << 20), high_priority_ratio: opts.high_priority_ratio, + meta_shard_num: opts.meta_shard_num, + block_shard_num: opts.block_shard_num, prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20), max_prefetch_block_number: opts.max_prefetch_block_number, data_file_cache, diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 46052015ac688..e3cc27ceb6f16 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -220,6 +220,8 @@ async fn compaction_test( meta_file_cache: FileCache::none(), recent_filter: None, state_store_metrics: state_store_metrics.clone(), + meta_shard_num: storage_memory_config.meta_shard_num, + block_shard_num: storage_memory_config.block_shard_num, })); let store = HummockStorage::new(