diff --git a/Cargo.lock b/Cargo.lock index b8d705132c8f0..6901b299fd066 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4150,9 +4150,9 @@ dependencies = [ [[package]] name = "foyer-memory" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a694cde4dd2c9fdd8cc2d294fcb265fdd6be1ee62fa35c37e9fc328871f33b6d" +checksum = "65b0a4ca5d917e684c9267e8e2bcec4b3e0c27dd91c84945c20c09ddff906cac" dependencies = [ "ahash 0.8.6", "bitflags 2.4.2", @@ -8917,6 +8917,7 @@ dependencies = [ "ethnum", "expect-test", "fixedbitset 0.5.0", + "foyer", "futures", "governor", "hex", @@ -9130,6 +9131,7 @@ dependencies = [ "chrono", "clap", "either", + "foyer", "futures", "futures-async-stream", "hyper", @@ -14229,9 +14231,6 @@ dependencies = [ "uuid", "whoami", "zeroize", - "zstd 0.13.0", - "zstd-safe 7.0.0", - "zstd-sys", ] [[package]] diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 03561ca289237..b123b65306e5b 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -45,6 +45,7 @@ enum-as-inner = "0.6" enumflags2 = { version = "0.7.8" } ethnum = { version = "1", features = ["serde"] } fixedbitset = { version = "0.5", features = ["std"] } +foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } governor = { version = "0.6", default-features = false, features = ["std"] } hex = "0.4.3" diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 3145e511c6086..f20bc16daf0db 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -24,6 +24,7 @@ use std::num::NonZeroUsize; use anyhow::Context; use clap::ValueEnum; use educe::Educe; +use foyer::memory::{LfuConfig, LruConfig}; use risingwave_common_proc_macro::ConfigDoc; pub use risingwave_common_proc_macro::OverrideConfig; use risingwave_pb::meta::SystemParams; @@ -532,6 +533,51 @@ pub struct StreamingConfig { pub use risingwave_common_metrics::MetricLevel; +/// the section `[storage.cache]` in `risingwave.toml`. +#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)] +pub struct CacheConfig { + #[serde(default)] + pub block_cache_capacity_mb: Option, + + #[serde(default)] + pub block_cache_shard_num: Option, + + #[serde(default)] + pub block_cache_eviction: CacheEvictionConfig, + + #[serde(default)] + pub meta_cache_capacity_mb: Option, + + #[serde(default)] + pub meta_cache_shard_num: Option, + + #[serde(default)] + pub meta_cache_eviction: CacheEvictionConfig, +} + +/// the section `[storage.cache.eviction]` in `risingwave.toml`. +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(tag = "algorithm")] +pub enum CacheEvictionConfig { + Lru { + high_priority_ratio_in_percent: Option, + }, + Lfu { + window_capacity_ratio_in_percent: Option, + protected_capacity_ratio_in_percent: Option, + cmsketch_eps: Option, + cmsketch_confidence: Option, + }, +} + +impl Default for CacheEvictionConfig { + fn default() -> Self { + Self::Lru { + high_priority_ratio_in_percent: None, + } + } +} + /// The section `[storage]` in `risingwave.toml`. #[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)] pub struct StorageConfig { @@ -562,22 +608,20 @@ pub struct StorageConfig { #[serde(default = "default::storage::write_conflict_detection_enabled")] pub write_conflict_detection_enabled: bool, - /// Capacity of sstable block cache. - #[serde(default)] - pub block_cache_capacity_mb: Option, - #[serde(default)] - pub block_shard_num: Option, + pub cache: CacheConfig, + /// DEPRECATED: This config will be deprecated in the future version, use `storage.cache.block_cache_capacity_mb` instead. #[serde(default)] - pub high_priority_ratio_in_percent: Option, + pub block_cache_capacity_mb: Option, - /// Capacity of sstable meta cache. + /// DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead. #[serde(default)] pub meta_cache_capacity_mb: Option, + /// DEPRECATED: This config will be deprecated in the future version, use `storage.cache.block_cache_eviction.high_priority_ratio_in_percent` with `storage.cache.block_cache_eviction.algorithm = "Lru"` instead. #[serde(default)] - pub meta_shard_num: Option, + pub high_priority_ratio_in_percent: Option, /// max memory usage for large query #[serde(default)] @@ -1203,6 +1247,19 @@ pub mod default { 70 } + pub fn window_capacity_ratio_in_percent() -> usize { + 10 + } + pub fn protected_capacity_ratio_in_percent() -> usize { + 80 + } + pub fn cmsketch_eps() -> f64 { + 0.002 + } + pub fn cmsketch_confidence() -> f64 { + 0.95 + } + pub fn meta_cache_capacity_mb() -> usize { 128 } @@ -1644,45 +1701,64 @@ pub mod default { } } +#[derive(Debug, Clone)] +pub enum EvictionConfig { + Lru(LruConfig), + Lfu(LfuConfig), +} + +impl EvictionConfig { + pub fn for_test() -> Self { + Self::Lru(LruConfig { + high_priority_pool_ratio: 0.0, + }) + } +} + pub struct StorageMemoryConfig { pub block_cache_capacity_mb: usize, - pub block_shard_num: usize, + pub block_cache_shard_num: usize, pub meta_cache_capacity_mb: usize, - pub meta_shard_num: usize, + pub meta_cache_shard_num: usize, pub shared_buffer_capacity_mb: usize, pub data_file_cache_ring_buffer_capacity_mb: usize, pub meta_file_cache_ring_buffer_capacity_mb: usize, pub compactor_memory_limit_mb: usize, pub prefetch_buffer_capacity_mb: usize, - pub high_priority_ratio_in_percent: usize, + pub block_cache_eviction_config: EvictionConfig, + pub meta_cache_eviction_config: EvictionConfig, } pub const MAX_META_CACHE_SHARD_BITS: usize = 4; pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256; -pub const MAX_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict. +pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict. pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig { - let block_cache_capacity_mb = s - .storage - .block_cache_capacity_mb - .unwrap_or(default::storage::block_cache_capacity_mb()); - let meta_cache_capacity_mb = s - .storage - .meta_cache_capacity_mb - .unwrap_or(default::storage::meta_cache_capacity_mb()); + let block_cache_capacity_mb = s.storage.cache.block_cache_capacity_mb.unwrap_or( + // adapt to old version + s.storage + .block_cache_capacity_mb + .unwrap_or(default::storage::block_cache_capacity_mb()), + ); + let meta_cache_capacity_mb = s.storage.cache.meta_cache_capacity_mb.unwrap_or( + // adapt to old version + s.storage + .block_cache_capacity_mb + .unwrap_or(default::storage::meta_cache_capacity_mb()), + ); let shared_buffer_capacity_mb = s .storage .shared_buffer_capacity_mb .unwrap_or(default::storage::shared_buffer_capacity_mb()); - let meta_shard_num = s.storage.meta_shard_num.unwrap_or_else(|| { + let meta_cache_shard_num = s.storage.cache.meta_cache_shard_num.unwrap_or_else(|| { let mut shard_bits = MAX_META_CACHE_SHARD_BITS; while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 { shard_bits -= 1; } shard_bits }); - let block_shard_num = s.storage.block_shard_num.unwrap_or_else(|| { - let mut shard_bits = MAX_CACHE_SHARD_BITS; + let block_cache_shard_num = s.storage.cache.block_cache_shard_num.unwrap_or_else(|| { + let mut shard_bits = MAX_BLOCK_CACHE_SHARD_BITS; while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 { shard_bits -= 1; @@ -1695,26 +1771,66 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig { .storage .compactor_memory_limit_mb .unwrap_or(default::storage::compactor_memory_limit_mb()); - let high_priority_ratio_in_percent = s - .storage - .high_priority_ratio_in_percent - .unwrap_or(default::storage::high_priority_ratio_in_percent()); - let prefetch_buffer_capacity_mb = s - .storage - .shared_buffer_capacity_mb - .unwrap_or((100 - high_priority_ratio_in_percent) * block_cache_capacity_mb / 100); + + let get_eviction_config = |c: &CacheEvictionConfig| match c { + CacheEvictionConfig::Lru { + high_priority_ratio_in_percent, + } => EvictionConfig::Lru(LruConfig { + high_priority_pool_ratio: high_priority_ratio_in_percent.unwrap_or( + // adapt to old version + s.storage + .high_priority_ratio_in_percent + .unwrap_or(default::storage::high_priority_ratio_in_percent()), + ) as f64 + / 100.0, + }), + CacheEvictionConfig::Lfu { + window_capacity_ratio_in_percent, + protected_capacity_ratio_in_percent, + cmsketch_eps, + cmsketch_confidence, + } => EvictionConfig::Lfu(LfuConfig { + window_capacity_ratio: window_capacity_ratio_in_percent + .unwrap_or(default::storage::window_capacity_ratio_in_percent()) + as f64 + / 100.0, + protected_capacity_ratio: protected_capacity_ratio_in_percent + .unwrap_or(default::storage::protected_capacity_ratio_in_percent()) + as f64 + / 100.0, + cmsketch_eps: cmsketch_eps.unwrap_or(default::storage::cmsketch_eps()), + cmsketch_confidence: cmsketch_confidence + .unwrap_or(default::storage::cmsketch_confidence()), + }), + }; + + let block_cache_eviction_config = get_eviction_config(&s.storage.cache.block_cache_eviction); + let meta_cache_eviction_config = get_eviction_config(&s.storage.cache.meta_cache_eviction); + + let prefetch_buffer_capacity_mb = + s.storage + .shared_buffer_capacity_mb + .unwrap_or(match &block_cache_eviction_config { + EvictionConfig::Lru(lru) => { + ((1.0 - lru.high_priority_pool_ratio) * block_cache_capacity_mb as f64) as usize + } + EvictionConfig::Lfu(lfu) => { + ((1.0 - lfu.protected_capacity_ratio) * block_cache_capacity_mb as f64) as usize + } + }); StorageMemoryConfig { block_cache_capacity_mb, - block_shard_num, + block_cache_shard_num, meta_cache_capacity_mb, - meta_shard_num, + meta_cache_shard_num, shared_buffer_capacity_mb, data_file_cache_ring_buffer_capacity_mb, meta_file_cache_ring_buffer_capacity_mb, compactor_memory_limit_mb, prefetch_buffer_capacity_mb, - high_priority_ratio_in_percent, + block_cache_eviction_config, + meta_cache_eviction_config, } } diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index 1c27e4f73e228..62bf74d21c371 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -20,6 +20,7 @@ await-tree = { workspace = true } chrono = { version = "0.4" } clap = { version = "4", features = ["derive"] } either = "1" +foyer = { workspace = true } futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } hyper = "0.14" diff --git a/src/compute/src/memory/config.rs b/src/compute/src/memory/config.rs index a6eee7c2416d3..f4a4b93c1d784 100644 --- a/src/compute/src/memory/config.rs +++ b/src/compute/src/memory/config.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use foyer::memory::{LfuConfig, LruConfig}; use risingwave_common::config::{ - StorageConfig, StorageMemoryConfig, MAX_CACHE_SHARD_BITS, MAX_META_CACHE_SHARD_BITS, - MIN_BUFFER_SIZE_PER_SHARD, + CacheEvictionConfig, EvictionConfig, StorageConfig, StorageMemoryConfig, + MAX_BLOCK_CACHE_SHARD_BITS, MAX_META_CACHE_SHARD_BITS, MIN_BUFFER_SIZE_PER_SHARD, }; use risingwave_common::util::pretty_bytes::convert; @@ -36,7 +37,6 @@ const STORAGE_META_CACHE_MAX_MEMORY_MB: usize = 4096; const STORAGE_SHARED_BUFFER_MAX_MEMORY_MB: usize = 4096; const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35; const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3; -const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 50; /// Each compute node reserves some memory for stack and code segment of processes, allocation /// overhead, network buffer, etc. based on `SYSTEM_RESERVED_MEMORY_PROPORTION`. The reserve memory @@ -74,20 +74,20 @@ pub fn storage_memory_config( * STORAGE_BLOCK_CACHE_MEMORY_PROPORTION) .ceil() as usize) >> 20; - let high_priority_ratio_in_percent = storage_config - .high_priority_ratio_in_percent - .unwrap_or(STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO); let default_meta_cache_capacity_mb = ((non_reserved_memory_bytes as f64 * storage_memory_proportion * STORAGE_META_CACHE_MEMORY_PROPORTION) .ceil() as usize) >> 20; - let meta_cache_capacity_mb = storage_config - .meta_cache_capacity_mb - .unwrap_or(std::cmp::min( - default_meta_cache_capacity_mb, - STORAGE_META_CACHE_MAX_MEMORY_MB, - )); + let meta_cache_capacity_mb = storage_config.cache.meta_cache_capacity_mb.unwrap_or( + // adapt to old version + storage_config + .meta_cache_capacity_mb + .unwrap_or(std::cmp::min( + default_meta_cache_capacity_mb, + STORAGE_META_CACHE_MAX_MEMORY_MB, + )), + ); let prefetch_buffer_capacity_mb = storage_config .prefetch_buffer_capacity_mb @@ -116,9 +116,12 @@ pub fn storage_memory_config( default_block_cache_capacity_mb = default_block_cache_capacity_mb.saturating_sub(shared_buffer_capacity_mb); } - let block_cache_capacity_mb = storage_config - .block_cache_capacity_mb - .unwrap_or(default_block_cache_capacity_mb); + let block_cache_capacity_mb = storage_config.cache.block_cache_capacity_mb.unwrap_or( + // adapt to old version + storage_config + .block_cache_capacity_mb + .unwrap_or(default_block_cache_capacity_mb), + ); let data_file_cache_ring_buffer_capacity_mb = if storage_config.data_file_cache.dir.is_empty() { 0 @@ -154,33 +157,80 @@ pub fn storage_memory_config( ); } - let meta_shard_num = storage_config.meta_shard_num.unwrap_or_else(|| { - let mut shard_bits = MAX_META_CACHE_SHARD_BITS; - while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 { - shard_bits -= 1; - } - 1 << shard_bits - }); - let block_shard_num = storage_config.block_shard_num.unwrap_or_else(|| { - let mut shard_bits = MAX_CACHE_SHARD_BITS; - while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 - { - shard_bits -= 1; - } - 1 << shard_bits - }); + let meta_cache_shard_num = storage_config + .cache + .meta_cache_shard_num + .unwrap_or_else(|| { + let mut shard_bits = MAX_META_CACHE_SHARD_BITS; + while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD + && shard_bits > 0 + { + shard_bits -= 1; + } + 1 << shard_bits + }); + let block_cache_shard_num = storage_config + .cache + .block_cache_shard_num + .unwrap_or_else(|| { + let mut shard_bits = MAX_BLOCK_CACHE_SHARD_BITS; + while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD + && shard_bits > 0 + { + shard_bits -= 1; + } + 1 << shard_bits + }); + + let get_eviction_config = |c: &CacheEvictionConfig| match c { + CacheEvictionConfig::Lru { + high_priority_ratio_in_percent, + } => EvictionConfig::Lru(LruConfig { + high_priority_pool_ratio: high_priority_ratio_in_percent.unwrap_or( + // adapt to old version + storage_config.high_priority_ratio_in_percent.unwrap_or( + risingwave_common::config::default::storage::high_priority_ratio_in_percent(), + ), + ) as f64 + / 100.0, + }), + CacheEvictionConfig::Lfu { + window_capacity_ratio_in_percent, + protected_capacity_ratio_in_percent, + cmsketch_eps, + cmsketch_confidence, + } => EvictionConfig::Lfu(LfuConfig { + window_capacity_ratio: window_capacity_ratio_in_percent.unwrap_or( + risingwave_common::config::default::storage::window_capacity_ratio_in_percent(), + ) as f64 + / 100.0, + protected_capacity_ratio: protected_capacity_ratio_in_percent.unwrap_or( + risingwave_common::config::default::storage::protected_capacity_ratio_in_percent(), + ) as f64 + / 100.0, + cmsketch_eps: cmsketch_eps + .unwrap_or(risingwave_common::config::default::storage::cmsketch_eps()), + cmsketch_confidence: cmsketch_confidence + .unwrap_or(risingwave_common::config::default::storage::cmsketch_confidence()), + }), + }; + + let block_cache_eviction_config = + get_eviction_config(&storage_config.cache.block_cache_eviction); + let meta_cache_eviction_config = get_eviction_config(&storage_config.cache.meta_cache_eviction); StorageMemoryConfig { block_cache_capacity_mb, - block_shard_num, + block_cache_shard_num, meta_cache_capacity_mb, - meta_shard_num, + meta_cache_shard_num, shared_buffer_capacity_mb, data_file_cache_ring_buffer_capacity_mb, meta_file_cache_ring_buffer_capacity_mb, compactor_memory_limit_mb, prefetch_buffer_capacity_mb, - high_priority_ratio_in_percent, + block_cache_eviction_config, + meta_cache_eviction_config, } } @@ -229,8 +279,8 @@ mod tests { assert_eq!(memory_config.meta_file_cache_ring_buffer_capacity_mb, 256); assert_eq!(memory_config.compactor_memory_limit_mb, 819); - storage_config.block_cache_capacity_mb = Some(512); - storage_config.meta_cache_capacity_mb = Some(128); + storage_config.cache.block_cache_capacity_mb = Some(512); + storage_config.cache.meta_cache_capacity_mb = Some(128); storage_config.shared_buffer_capacity_mb = Some(1024); storage_config.compactor_memory_limit_mb = Some(512); let memory_config = storage_memory_config(0, true, &storage_config); diff --git a/src/config/docs.md b/src/config/docs.md index 3730b4e828f6d..f673ea0c186b4 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -93,8 +93,8 @@ This page is automatically generated by `./risedev generate-example-config` | Config | Description | Default | |--------|-------------|---------| -| block_cache_capacity_mb | Capacity of sstable block cache. | | -| block_shard_num | | | +| block_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.block_cache_capacity_mb` instead. | | +| cache | | | | cache_refill | | | | check_compaction_result | | false | | compact_iter_recreate_timeout_ms | | 600000 | @@ -108,7 +108,7 @@ This page is automatically generated by `./risedev generate-example-config` | data_file_cache | | | | disable_remote_compactor | | false | | enable_fast_compaction | | false | -| high_priority_ratio_in_percent | | | +| high_priority_ratio_in_percent | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.block_cache_eviction.high_priority_ratio_in_percent` with `storage.cache.block_cache_eviction.algorithm = "Lru"` instead. | | | imm_merge_threshold | The threshold for the number of immutable memtables to merge to a new imm. | 0 | | max_concurrent_compaction_task_number | | 16 | | max_prefetch_block_number | max prefetch block number | 16 | @@ -117,9 +117,8 @@ This page is automatically generated by `./risedev generate-example-config` | max_sub_compaction | Max sub compaction task numbers | 4 | | max_version_pinning_duration_sec | | 10800 | | mem_table_spill_threshold | The spill threshold for mem table. | 4194304 | -| meta_cache_capacity_mb | Capacity of sstable meta cache. | | +| meta_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead. | | | meta_file_cache | | | -| meta_shard_num | | | | min_sst_size_for_streaming_upload | Whether to enable streaming upload for sstable. | 33554432 | | object_store | | | | prefetch_buffer_capacity_mb | max memory usage for large query | | diff --git a/src/config/example.toml b/src/config/example.toml index 0c223e69e687f..57718dbf75cc1 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -133,6 +133,12 @@ compactor_fast_max_compact_delete_ratio = 40 compactor_fast_max_compact_task_size = 2147483648 mem_table_spill_threshold = 4194304 +[storage.cache.block_cache_eviction] +algorithm = "Lru" + +[storage.cache.meta_cache_eviction] +algorithm = "Lru" + [storage.data_file_cache] dir = "" capacity_mb = 1024 diff --git a/src/ctl/src/common/hummock_service.rs b/src/ctl/src/common/hummock_service.rs index f5ad8c885a9e3..a3284ad4122d7 100644 --- a/src/ctl/src/common/hummock_service.rs +++ b/src/ctl/src/common/hummock_service.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, bail, Result}; -use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; +use risingwave_common::config::{EvictionConfig, MetricLevel, ObjectStoreConfig}; use risingwave_object_store::object::build_remote_object_store; use risingwave_rpc_client::MetaClient; use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient; @@ -168,7 +168,10 @@ impl HummockServiceOpts { path: opts.data_directory, block_cache_capacity: opts.block_cache_capacity_mb * (1 << 20), meta_cache_capacity: opts.meta_cache_capacity_mb * (1 << 20), - high_priority_ratio: 0, + block_cache_shard_num: opts.block_cache_shard_num, + meta_cache_shard_num: opts.meta_cache_shard_num, + block_cache_eviction: EvictionConfig::for_test(), + meta_cache_eviction: EvictionConfig::for_test(), prefetch_buffer_capacity: opts.block_cache_capacity_mb * (1 << 20), max_prefetch_block_number: opts.max_prefetch_block_number, data_file_cache: FileCache::none(), @@ -177,8 +180,6 @@ impl HummockServiceOpts { state_store_metrics: Arc::new(global_hummock_state_store_metrics( MetricLevel::Disabled, )), - block_shard_num: opts.block_shard_num, - meta_shard_num: opts.meta_shard_num, }))) } } diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index 538711ecea7e6..42e584fb3820c 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use bytes::Bytes; use futures::{Stream, TryStreamExt}; use risingwave_common::catalog::ColumnDesc; -use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; +use risingwave_common::config::{EvictionConfig, MetricLevel, ObjectStoreConfig}; use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; @@ -81,14 +81,15 @@ impl HummockJavaBindingIterator { store: object_store, path: read_plan.data_dir, block_cache_capacity: 1 << 10, + block_cache_shard_num: 2, + block_cache_eviction: EvictionConfig::for_test(), meta_cache_capacity: 1 << 10, - high_priority_ratio: 0, + meta_cache_shard_num: 2, + meta_cache_eviction: EvictionConfig::for_test(), prefetch_buffer_capacity: 1 << 10, max_prefetch_block_number: 16, data_file_cache: FileCache::none(), meta_file_cache: FileCache::none(), - block_shard_num: 2, - meta_shard_num: 2, recent_filter: None, state_store_metrics: Arc::new(global_hummock_state_store_metrics( MetricLevel::Disabled, diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index dce767f0b5bfb..d5d067b2bc27d 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -19,7 +19,7 @@ use criterion::async_executor::FuturesExecutor; use criterion::{criterion_group, criterion_main, Criterion}; use foyer::memory::CacheContext; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; -use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; +use risingwave_common::config::{EvictionConfig, MetricLevel, ObjectStoreConfig}; use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; use risingwave_common::types::DataType; @@ -63,9 +63,10 @@ pub fn mock_sstable_store() -> SstableStoreRef { path, block_cache_capacity: 64 << 20, meta_cache_capacity: 128 << 20, - high_priority_ratio: 0, - meta_shard_num: 2, - block_shard_num: 2, + meta_cache_shard_num: 2, + block_cache_shard_num: 2, + block_cache_eviction: EvictionConfig::for_test(), + meta_cache_eviction: EvictionConfig::for_test(), prefetch_buffer_capacity: 64 << 20, max_prefetch_block_number: 16, data_file_cache: FileCache::none(), diff --git a/src/storage/benches/bench_multi_builder.rs b/src/storage/benches/bench_multi_builder.rs index 590fb1a6f497e..ef07fd107986f 100644 --- a/src/storage/benches/bench_multi_builder.rs +++ b/src/storage/benches/bench_multi_builder.rs @@ -23,7 +23,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use futures::future::try_join_all; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; +use risingwave_common::config::{EvictionConfig, MetricLevel, ObjectStoreConfig}; use risingwave_hummock_sdk::key::{FullKey, UserKey}; use risingwave_object_store::object::{ObjectStore, ObjectStoreImpl, S3ObjectStore}; use risingwave_storage::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory}; @@ -146,9 +146,10 @@ fn bench_builder( path: "test".to_string(), block_cache_capacity: 64 << 20, meta_cache_capacity: 128 << 20, - high_priority_ratio: 0, - meta_shard_num: 2, - block_shard_num: 2, + meta_cache_shard_num: 2, + block_cache_shard_num: 2, + block_cache_eviction: EvictionConfig::for_test(), + meta_cache_eviction: EvictionConfig::for_test(), prefetch_buffer_capacity: 64 << 20, max_prefetch_block_number: 16, data_file_cache: FileCache::none(), diff --git a/src/storage/hummock_test/src/bin/replay/main.rs b/src/storage/hummock_test/src/bin/replay/main.rs index a768b563a8d40..693a6448a4ddc 100644 --- a/src/storage/hummock_test/src/bin/replay/main.rs +++ b/src/storage/hummock_test/src/bin/replay/main.rs @@ -31,7 +31,8 @@ use std::sync::Arc; use clap::Parser; use replay_impl::{get_replay_notification_client, GlobalReplayImpl}; use risingwave_common::config::{ - extract_storage_memory_config, load_config, NoOverride, ObjectStoreConfig, StorageConfig, + extract_storage_memory_config, load_config, EvictionConfig, NoOverride, ObjectStoreConfig, + StorageConfig, }; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_hummock_trace::{ @@ -115,15 +116,16 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result, BlockCacheEventListener>, @@ -108,29 +117,38 @@ impl BlockResponse { } impl BlockCache { - // TODO(MrCroxx): support other cache algorithm - pub fn new( - capacity: usize, - block_shard_num: usize, - high_priority_ratio: usize, - event_listener: BlockCacheEventListener, - ) -> Self { - if capacity == 0 { - panic!("block cache capacity == 0"); - } - - let cache = Cache::lru(LruCacheConfig { - capacity, - shards: block_shard_num, - eviction_config: LruConfig { - high_priority_pool_ratio: high_priority_ratio as f64 / 100.0, - }, - object_pool_capacity: block_shard_num * 1024, - hash_builder: RandomState::default(), - event_listener, - }); + pub fn new(config: BlockCacheConfig) -> Self { + assert!( + config.capacity > 0, + "Block cache capacity must be positive." + ); + + let capacity = config.capacity; + let shards = config.shard_num; + let object_pool_capacity = shards * 1024; + let hash_builder = RandomState::default(); + let event_listener = config.listener; + + let inner = match config.eviction { + EvictionConfig::Lru(eviction_config) => Cache::lru(LruCacheConfig { + capacity, + shards, + eviction_config, + object_pool_capacity, + hash_builder, + event_listener, + }), + EvictionConfig::Lfu(eviction_config) => Cache::lfu(LfuCacheConfig { + capacity, + shards, + eviction_config, + object_pool_capacity, + hash_builder, + event_listener, + }), + }; - Self { inner: cache } + Self { inner } } pub fn get(&self, object_id: HummockSstableObjectId, block_idx: u64) -> Option { diff --git a/src/storage/src/hummock/file_cache/store.rs b/src/storage/src/hummock/file_cache/store.rs index 8b3b931607719..380521c7779fe 100644 --- a/src/storage/src/hummock/file_cache/store.rs +++ b/src/storage/src/hummock/file_cache/store.rs @@ -43,7 +43,7 @@ pub mod preclude { pub type Result = core::result::Result; -pub type EvictionConfig = foyer::intrusive::eviction::lfu::LfuConfig; +pub type FileCacheEvictionConfig = foyer::intrusive::eviction::lfu::LfuConfig; pub type DeviceConfig = foyer::storage::device::fs::FsDeviceConfig; pub type FileCacheResult = foyer::storage::error::Result; diff --git a/src/storage/src/hummock/iterator/test_utils.rs b/src/storage/src/hummock/iterator/test_utils.rs index 8c60b318ff8bd..0103280570f5e 100644 --- a/src/storage/src/hummock/iterator/test_utils.rs +++ b/src/storage/src/hummock/iterator/test_utils.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use bytes::Bytes; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; +use risingwave_common::config::{EvictionConfig, MetricLevel, ObjectStoreConfig}; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::test_epoch; use risingwave_hummock_sdk::key::{prefix_slice_with_vnode, FullKey, TableKey, UserKey}; @@ -69,12 +69,14 @@ pub fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableSto store, path, block_cache_capacity: 64 << 20, + block_cache_shard_num: 2, + block_cache_eviction: EvictionConfig::for_test(), meta_cache_capacity: 64 << 20, - high_priority_ratio: 0, + meta_cache_shard_num: 2, + meta_cache_eviction: EvictionConfig::for_test(), prefetch_buffer_capacity: 64 << 20, max_prefetch_block_number: 16, - meta_shard_num: 2, - block_shard_num: 2, + data_file_cache: FileCache::none(), meta_file_cache: FileCache::none(), recent_filter: None, diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index c272dda0d886e..16fa9d6faccf0 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -23,12 +23,12 @@ use await_tree::InstrumentAwait; use bytes::Bytes; use fail::fail_point; use foyer::memory::{ - Cache, CacheContext, CacheEntry, CacheEventListener, EntryState, Key, LruCacheConfig, - LruConfig, Value, + Cache, CacheContext, CacheEntry, CacheEventListener, EntryState, Key, LfuCacheConfig, + LruCacheConfig, LruConfig, Value, }; use futures::{future, StreamExt}; use itertools::Itertools; -use risingwave_common::config::StorageMemoryConfig; +use risingwave_common::config::{EvictionConfig, StorageMemoryConfig}; use risingwave_hummock_sdk::{HummockSstableObjectId, OBJECT_SUFFIX}; use risingwave_hummock_trace::TracedCachePolicy; use risingwave_object_store::object::{ @@ -42,8 +42,8 @@ use zstd::zstd_safe::WriteBuf; use super::utils::MemoryTracker; use super::{ - Block, BlockCache, BlockMeta, BlockResponse, CachedBlock, CachedSstable, FileCache, - RecentFilter, Sstable, SstableBlockIndex, SstableMeta, SstableWriter, + Block, BlockCache, BlockCacheConfig, BlockMeta, BlockResponse, CachedBlock, CachedSstable, + FileCache, RecentFilter, Sstable, SstableBlockIndex, SstableMeta, SstableWriter, }; use crate::hummock::block_stream::{ BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream, @@ -96,6 +96,7 @@ impl From for TracedCachePolicy { } } +#[derive(Debug)] pub struct BlockCacheEventListener { data_file_cache: FileCache, metrics: Arc, @@ -190,10 +191,11 @@ pub struct SstableStoreConfig { pub store: ObjectStoreRef, pub path: String, pub block_cache_capacity: usize, + pub block_cache_shard_num: usize, + pub block_cache_eviction: EvictionConfig, pub meta_cache_capacity: usize, - pub high_priority_ratio: usize, - pub meta_shard_num: usize, - pub block_shard_num: usize, + pub meta_cache_shard_num: usize, + pub meta_cache_eviction: EvictionConfig, pub prefetch_buffer_capacity: usize, pub max_prefetch_block_number: usize, pub data_file_cache: FileCache, @@ -225,26 +227,44 @@ impl SstableStore { // TODO: We should validate path early. Otherwise object store won't report invalid path // error until first write attempt. - let block_cache = BlockCache::new( - config.block_cache_capacity, - config.block_shard_num, - config.high_priority_ratio, - BlockCacheEventListener::new( + let block_cache = BlockCache::new(BlockCacheConfig { + capacity: config.block_cache_capacity, + shard_num: config.block_cache_shard_num, + eviction: config.block_cache_eviction, + listener: BlockCacheEventListener::new( config.data_file_cache.clone(), config.state_store_metrics.clone(), ), - ); - // TODO(MrCroxx): support other cache algorithm - let meta_cache = Arc::new(Cache::lru(LruCacheConfig { - capacity: config.meta_cache_capacity, - shards: config.meta_shard_num, - eviction_config: LruConfig { - high_priority_pool_ratio: 0.0, - }, - object_pool_capacity: config.meta_shard_num * 1024, - hash_builder: RandomState::default(), - event_listener: MetaCacheEventListener::from(config.meta_file_cache.clone()), - })); + }); + + // TODO(MrCroxx): reuse BlockCacheConfig here? + let meta_cache = { + let capacity = config.meta_cache_capacity; + let shards = config.meta_cache_shard_num; + let object_pool_capacity = config.meta_cache_shard_num * 1024; + let hash_builder = RandomState::default(); + let event_listener = MetaCacheEventListener::from(config.meta_file_cache.clone()); + match config.meta_cache_eviction { + EvictionConfig::Lru(eviction_config) => Cache::lru(LruCacheConfig { + capacity, + shards, + eviction_config, + object_pool_capacity, + hash_builder, + event_listener, + }), + EvictionConfig::Lfu(eviction_config) => Cache::lfu(LfuCacheConfig { + capacity, + shards, + eviction_config, + object_pool_capacity, + hash_builder, + event_listener, + }), + } + }; + let meta_cache = Arc::new(meta_cache); + Self { path: config.path, store: config.store, @@ -269,7 +289,6 @@ impl SstableStore { block_cache_capacity: usize, meta_cache_capacity: usize, ) -> Self { - // TODO(MrCroxx): support other cache algorithm let meta_cache = Arc::new(Cache::lru(LruCacheConfig { capacity: meta_cache_capacity, shards: 1, @@ -283,15 +302,17 @@ impl SstableStore { Self { path, store, - block_cache: BlockCache::new( - block_cache_capacity, - 1, - 0, - BlockCacheEventListener::new( + block_cache: BlockCache::new(BlockCacheConfig { + capacity: block_cache_capacity, + shard_num: 1, + eviction: EvictionConfig::Lru(LruConfig { + high_priority_pool_ratio: 0.0, + }), + listener: BlockCacheEventListener::new( FileCache::none(), Arc::new(HummockStateStoreMetrics::unused()), ), - ), + }), meta_cache, data_file_cache: FileCache::none(), meta_file_cache: FileCache::none(), diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index 5acecc09eaa05..81524670a83b9 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -20,6 +20,7 @@ use bytes::Bytes; use foyer::memory::CacheContext; use itertools::Itertools; use risingwave_common::catalog::TableId; +use risingwave_common::config::EvictionConfig; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::test_epoch; use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, UserKey}; @@ -57,7 +58,7 @@ pub fn default_opts_for_test() -> StorageOpts { write_conflict_detection_enabled: true, block_cache_capacity_mb: 64, meta_cache_capacity_mb: 64, - high_priority_ratio: 0, + block_cache_eviction_config: EvictionConfig::for_test(), disable_remote_compactor: false, share_buffer_upload_concurrency: 1, compactor_memory_limit_mb: 64, diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index 9febded2d52db..6a39cad111268 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_common::config::{ - extract_storage_memory_config, ObjectStoreConfig, RwConfig, StorageMemoryConfig, + extract_storage_memory_config, EvictionConfig, ObjectStoreConfig, RwConfig, StorageMemoryConfig, }; use risingwave_common::system_param::reader::{SystemParamsRead, SystemParamsReader}; use risingwave_common::system_param::system_params_for_test; @@ -48,13 +48,15 @@ pub struct StorageOpts { /// Capacity of sstable block cache. pub block_cache_capacity_mb: usize, /// the number of block-cache shard. Less shard means that more concurrent-conflict. - pub block_shard_num: usize, + pub block_cache_shard_num: usize, + /// Eviction config for block cache. + pub block_cache_eviction_config: EvictionConfig, /// Capacity of sstable meta cache. pub meta_cache_capacity_mb: usize, /// the number of meta-cache shard. Less shard means that more concurrent-conflict. - pub meta_shard_num: usize, - /// Percent of the ratio of high priority data in block-cache - pub high_priority_ratio: usize, + pub meta_cache_shard_num: usize, + /// Eviction config for meta cache. + pub meta_cache_eviction_config: EvictionConfig, /// max memory usage for large query. pub prefetch_buffer_capacity_mb: usize, @@ -169,13 +171,14 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt imm_merge_threshold: c.storage.imm_merge_threshold, data_directory: p.data_directory().to_string(), write_conflict_detection_enabled: c.storage.write_conflict_detection_enabled, - high_priority_ratio: s.high_priority_ratio_in_percent, block_cache_capacity_mb: s.block_cache_capacity_mb, + block_cache_shard_num: s.block_cache_shard_num, + block_cache_eviction_config: s.block_cache_eviction_config.clone(), + meta_cache_capacity_mb: s.meta_cache_capacity_mb, + meta_cache_shard_num: s.meta_cache_shard_num, + meta_cache_eviction_config: s.meta_cache_eviction_config.clone(), prefetch_buffer_capacity_mb: s.prefetch_buffer_capacity_mb, max_prefetch_block_number: c.storage.max_prefetch_block_number, - meta_cache_capacity_mb: s.meta_cache_capacity_mb, - block_shard_num: s.block_shard_num, - meta_shard_num: s.meta_shard_num, disable_remote_compactor: c.storage.disable_remote_compactor, share_buffer_upload_concurrency: c.storage.share_buffer_upload_concurrency, compactor_memory_limit_mb: s.compactor_memory_limit_mb, diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index de6b5b9bdc541..f1c4305d74d01 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -615,10 +615,11 @@ impl StateStoreImpl { store: Arc::new(object_store), path: opts.data_directory.to_string(), block_cache_capacity: opts.block_cache_capacity_mb * (1 << 20), + block_cache_shard_num: opts.block_cache_shard_num, + block_cache_eviction: opts.block_cache_eviction_config.clone(), meta_cache_capacity: opts.meta_cache_capacity_mb * (1 << 20), - high_priority_ratio: opts.high_priority_ratio, - meta_shard_num: opts.meta_shard_num, - block_shard_num: opts.block_shard_num, + meta_cache_shard_num: opts.meta_cache_shard_num, + meta_cache_eviction: opts.meta_cache_eviction_config.clone(), prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20), max_prefetch_block_number: opts.max_prefetch_block_number, data_file_cache, diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index cde3c61568f93..b660117168847 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -25,7 +25,8 @@ use rand::rngs::StdRng; use rand::{RngCore, SeedableRng}; use risingwave_common::catalog::TableId; use risingwave_common::config::{ - extract_storage_memory_config, load_config, NoOverride, ObjectStoreConfig, RwConfig, + extract_storage_memory_config, load_config, EvictionConfig, NoOverride, ObjectStoreConfig, + RwConfig, }; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::{test_epoch, EpochExt}; @@ -213,15 +214,16 @@ async fn compaction_test( path: system_params.data_directory().to_string(), block_cache_capacity: storage_memory_config.block_cache_capacity_mb * (1 << 20), meta_cache_capacity: storage_memory_config.meta_cache_capacity_mb * (1 << 20), - high_priority_ratio: 0, + block_cache_shard_num: storage_memory_config.block_cache_shard_num, + meta_cache_shard_num: storage_memory_config.meta_cache_shard_num, + block_cache_eviction: EvictionConfig::for_test(), + meta_cache_eviction: EvictionConfig::for_test(), prefetch_buffer_capacity: storage_memory_config.prefetch_buffer_capacity_mb * (1 << 20), max_prefetch_block_number: storage_opts.max_prefetch_block_number, data_file_cache: FileCache::none(), meta_file_cache: FileCache::none(), recent_filter: None, state_store_metrics: state_store_metrics.clone(), - meta_shard_num: storage_memory_config.meta_shard_num, - block_shard_num: storage_memory_config.block_shard_num, })); let store = HummockStorage::new( diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 0e7e8c296dfdc..3f9edb99410f4 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -154,9 +154,6 @@ url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } whoami = { version = "1" } zeroize = { version = "1" } -zstd = { version = "0.13" } -zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] } -zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] } [build-dependencies] ahash = { version = "0.8" }