Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support w-tinylfu as block cache eviction algorithm (#15682) #15750

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ enum-as-inner = "0.6"
enumflags2 = { version = "0.7.8" }
ethnum = { version = "1", features = ["serde"] }
fixedbitset = { version = "0.5", features = ["std"] }
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
governor = { version = "0.6", default-features = false, features = ["std"] }
hex = "0.4.3"
Expand Down
184 changes: 150 additions & 34 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::num::NonZeroUsize;
use anyhow::Context;
use clap::ValueEnum;
use educe::Educe;
use foyer::memory::{LfuConfig, LruConfig};
use risingwave_common_proc_macro::ConfigDoc;
pub use risingwave_common_proc_macro::OverrideConfig;
use risingwave_pb::meta::SystemParams;
Expand Down Expand Up @@ -532,6 +533,51 @@ pub struct StreamingConfig {

pub use risingwave_common_metrics::MetricLevel;

/// the section `[storage.cache]` in `risingwave.toml`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
pub struct CacheConfig {
#[serde(default)]
pub block_cache_capacity_mb: Option<usize>,

#[serde(default)]
pub block_cache_shard_num: Option<usize>,

#[serde(default)]
pub block_cache_eviction: CacheEvictionConfig,

#[serde(default)]
pub meta_cache_capacity_mb: Option<usize>,

#[serde(default)]
pub meta_cache_shard_num: Option<usize>,

#[serde(default)]
pub meta_cache_eviction: CacheEvictionConfig,
}

/// the section `[storage.cache.eviction]` in `risingwave.toml`.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "algorithm")]
pub enum CacheEvictionConfig {
Lru {
high_priority_ratio_in_percent: Option<usize>,
},
Lfu {
window_capacity_ratio_in_percent: Option<usize>,
protected_capacity_ratio_in_percent: Option<usize>,
cmsketch_eps: Option<f64>,
cmsketch_confidence: Option<f64>,
},
}

impl Default for CacheEvictionConfig {
fn default() -> Self {
Self::Lru {
high_priority_ratio_in_percent: None,
}
}
}

/// The section `[storage]` in `risingwave.toml`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
pub struct StorageConfig {
Expand Down Expand Up @@ -562,22 +608,20 @@ pub struct StorageConfig {
#[serde(default = "default::storage::write_conflict_detection_enabled")]
pub write_conflict_detection_enabled: bool,

/// Capacity of sstable block cache.
#[serde(default)]
pub block_cache_capacity_mb: Option<usize>,

#[serde(default)]
pub block_shard_num: Option<usize>,
pub cache: CacheConfig,

/// DEPRECATED: This config will be deprecated in the future version, use `storage.cache.block_cache_capacity_mb` instead.
#[serde(default)]
pub high_priority_ratio_in_percent: Option<usize>,
pub block_cache_capacity_mb: Option<usize>,

/// Capacity of sstable meta cache.
/// DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead.
#[serde(default)]
pub meta_cache_capacity_mb: Option<usize>,

/// DEPRECATED: This config will be deprecated in the future version, use `storage.cache.block_cache_eviction.high_priority_ratio_in_percent` with `storage.cache.block_cache_eviction.algorithm = "Lru"` instead.
#[serde(default)]
pub meta_shard_num: Option<usize>,
pub high_priority_ratio_in_percent: Option<usize>,

/// max memory usage for large query
#[serde(default)]
Expand Down Expand Up @@ -1203,6 +1247,19 @@ pub mod default {
70
}

pub fn window_capacity_ratio_in_percent() -> usize {
10
}
pub fn protected_capacity_ratio_in_percent() -> usize {
80
}
pub fn cmsketch_eps() -> f64 {
0.002
}
pub fn cmsketch_confidence() -> f64 {
0.95
}

pub fn meta_cache_capacity_mb() -> usize {
128
}
Expand Down Expand Up @@ -1644,45 +1701,64 @@ pub mod default {
}
}

#[derive(Debug, Clone)]
pub enum EvictionConfig {
Lru(LruConfig),
Lfu(LfuConfig),
}

impl EvictionConfig {
pub fn for_test() -> Self {
Self::Lru(LruConfig {
high_priority_pool_ratio: 0.0,
})
}
}

pub struct StorageMemoryConfig {
pub block_cache_capacity_mb: usize,
pub block_shard_num: usize,
pub block_cache_shard_num: usize,
pub meta_cache_capacity_mb: usize,
pub meta_shard_num: usize,
pub meta_cache_shard_num: usize,
pub shared_buffer_capacity_mb: usize,
pub data_file_cache_ring_buffer_capacity_mb: usize,
pub meta_file_cache_ring_buffer_capacity_mb: usize,
pub compactor_memory_limit_mb: usize,
pub prefetch_buffer_capacity_mb: usize,
pub high_priority_ratio_in_percent: usize,
pub block_cache_eviction_config: EvictionConfig,
pub meta_cache_eviction_config: EvictionConfig,
}

pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
pub const MAX_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict.
pub const MAX_BLOCK_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict.

pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
let block_cache_capacity_mb = s
.storage
.block_cache_capacity_mb
.unwrap_or(default::storage::block_cache_capacity_mb());
let meta_cache_capacity_mb = s
.storage
.meta_cache_capacity_mb
.unwrap_or(default::storage::meta_cache_capacity_mb());
let block_cache_capacity_mb = s.storage.cache.block_cache_capacity_mb.unwrap_or(
// adapt to old version
s.storage
.block_cache_capacity_mb
.unwrap_or(default::storage::block_cache_capacity_mb()),
);
let meta_cache_capacity_mb = s.storage.cache.meta_cache_capacity_mb.unwrap_or(
// adapt to old version
s.storage
.block_cache_capacity_mb
.unwrap_or(default::storage::meta_cache_capacity_mb()),
);
let shared_buffer_capacity_mb = s
.storage
.shared_buffer_capacity_mb
.unwrap_or(default::storage::shared_buffer_capacity_mb());
let meta_shard_num = s.storage.meta_shard_num.unwrap_or_else(|| {
let meta_cache_shard_num = s.storage.cache.meta_cache_shard_num.unwrap_or_else(|| {
let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 {
shard_bits -= 1;
}
shard_bits
});
let block_shard_num = s.storage.block_shard_num.unwrap_or_else(|| {
let mut shard_bits = MAX_CACHE_SHARD_BITS;
let block_cache_shard_num = s.storage.cache.block_cache_shard_num.unwrap_or_else(|| {
let mut shard_bits = MAX_BLOCK_CACHE_SHARD_BITS;
while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0
{
shard_bits -= 1;
Expand All @@ -1695,26 +1771,66 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
.storage
.compactor_memory_limit_mb
.unwrap_or(default::storage::compactor_memory_limit_mb());
let high_priority_ratio_in_percent = s
.storage
.high_priority_ratio_in_percent
.unwrap_or(default::storage::high_priority_ratio_in_percent());
let prefetch_buffer_capacity_mb = s
.storage
.shared_buffer_capacity_mb
.unwrap_or((100 - high_priority_ratio_in_percent) * block_cache_capacity_mb / 100);

let get_eviction_config = |c: &CacheEvictionConfig| match c {
CacheEvictionConfig::Lru {
high_priority_ratio_in_percent,
} => EvictionConfig::Lru(LruConfig {
high_priority_pool_ratio: high_priority_ratio_in_percent.unwrap_or(
// adapt to old version
s.storage
.high_priority_ratio_in_percent
.unwrap_or(default::storage::high_priority_ratio_in_percent()),
) as f64
/ 100.0,
}),
CacheEvictionConfig::Lfu {
window_capacity_ratio_in_percent,
protected_capacity_ratio_in_percent,
cmsketch_eps,
cmsketch_confidence,
} => EvictionConfig::Lfu(LfuConfig {
window_capacity_ratio: window_capacity_ratio_in_percent
.unwrap_or(default::storage::window_capacity_ratio_in_percent())
as f64
/ 100.0,
protected_capacity_ratio: protected_capacity_ratio_in_percent
.unwrap_or(default::storage::protected_capacity_ratio_in_percent())
as f64
/ 100.0,
cmsketch_eps: cmsketch_eps.unwrap_or(default::storage::cmsketch_eps()),
cmsketch_confidence: cmsketch_confidence
.unwrap_or(default::storage::cmsketch_confidence()),
}),
};

let block_cache_eviction_config = get_eviction_config(&s.storage.cache.block_cache_eviction);
let meta_cache_eviction_config = get_eviction_config(&s.storage.cache.meta_cache_eviction);

let prefetch_buffer_capacity_mb =
s.storage
.shared_buffer_capacity_mb
.unwrap_or(match &block_cache_eviction_config {
EvictionConfig::Lru(lru) => {
((1.0 - lru.high_priority_pool_ratio) * block_cache_capacity_mb as f64) as usize
}
EvictionConfig::Lfu(lfu) => {
((1.0 - lfu.protected_capacity_ratio) * block_cache_capacity_mb as f64) as usize
}
});

StorageMemoryConfig {
block_cache_capacity_mb,
block_shard_num,
block_cache_shard_num,
meta_cache_capacity_mb,
meta_shard_num,
meta_cache_shard_num,
shared_buffer_capacity_mb,
data_file_cache_ring_buffer_capacity_mb,
meta_file_cache_ring_buffer_capacity_mb,
compactor_memory_limit_mb,
prefetch_buffer_capacity_mb,
high_priority_ratio_in_percent,
block_cache_eviction_config,
meta_cache_eviction_config,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ await-tree = { workspace = true }
chrono = { version = "0.4" }
clap = { version = "4", features = ["derive"] }
either = "1"
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
hyper = "0.14"
Expand Down
Loading
Loading