Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support w-tinylfu as block cache eviction algorithm #15661

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4ae8cea
feat: use an unreleased foyer version
MrCroxx Mar 1, 2024
321626d
feat: upgrade foyer
MrCroxx Mar 1, 2024
83784e2
feat: replace risingwave lru cache with foyer lru cache for sst cache
MrCroxx Mar 1, 2024
846a2f3
fix: add shoule panic for a need removing ut
MrCroxx Mar 1, 2024
3733a44
Merge remote-tracking branch 'origin/main' into xx/foyer-block-cache
MrCroxx Mar 1, 2024
9b1d67b
Merge remote-tracking branch 'origin/main' into xx/foyer-block-cache
MrCroxx Mar 4, 2024
c5ac498
fix: update foyer, restore cache shard calculation
MrCroxx Mar 4, 2024
6e463b0
feat: upgrade foyer version
MrCroxx Mar 8, 2024
2233708
feat: upgrade foyer, setup block cache event listener
MrCroxx Mar 11, 2024
9dddaf4
Merge remote-tracking branch 'origin/main' into xx/foyer-block-cache
MrCroxx Mar 11, 2024
4f84192
chore: upgrade foyer
MrCroxx Mar 11, 2024
13f9853
refactor: upgrade foyer to simplify block cache with foyer memory
MrCroxx Mar 12, 2024
280f808
chore: upgrade foyer
MrCroxx Mar 12, 2024
e9cac6a
Merge remote-tracking branch 'origin/main' into xx/foyer-block-cache
MrCroxx Mar 12, 2024
9294764
chore: use released foyer 0.6
MrCroxx Mar 12, 2024
e0f4eae
Merge remote-tracking branch 'origin/main' into xx/foyer-block-cache
MrCroxx Mar 13, 2024
192053d
Merge remote-tracking branch 'origin/main' into xx/foyer-block-cache
MrCroxx Mar 13, 2024
63beca5
refactor: extract block cache config
MrCroxx Mar 13, 2024
e57e5c1
feat(storage): support w-tinylfu as block cache eviction algorithm
MrCroxx Mar 13, 2024
b1b7c50
chore: upgrade foyer-memory
MrCroxx Mar 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 80 additions & 70 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ license = "Apache-2.0"
repository = "https://github.com/risingwavelabs/risingwave"

[workspace.dependencies]
foyer = "0.6"
await-tree = "0.1.1"
aws-config = { version = "1", default-features = false, features = [
"behavior-version-latest",
Expand Down Expand Up @@ -113,11 +114,7 @@ hytra = "0.1"
rdkafka = { package = "madsim-rdkafka", version = "0.3.1", features = [
"cmake-build",
] }
hashbrown = { version = "0.14.0", features = [
"ahash",
"inline-more",
"nightly",
] }
hashbrown = { version = "0.14", features = ["ahash", "inline-more", "nightly"] }
criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
Expand Down
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
either = "1"
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
futures-util = "0.3"
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ mod tests {

/// A test to verify that `HashMap` may leak memory counter when using `into_iter`.
#[test]
#[should_panic] // TODO(MrCroxx): This bug is fixed and the test should panic. Remove the test and fix the related code later.
fn test_hashmap_into_iter_bug() {
let dropped: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));

Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ mod tests {
use std::sync::Arc;

use assert_matches::assert_matches;
use foyer::memory::CacheContext;
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::array::{Array, ArrayImpl, I32Array, StructArray};
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::{
schema_test_utils, ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID,
};
Expand Down Expand Up @@ -400,7 +400,7 @@ mod tests {
epoch,
None,
ReadOptions {
cache_policy: CachePolicy::Fill(CachePriority::High),
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ enum-as-inner = "0.6"
enumflags2 = { version = "0.7.8" }
ethnum = { version = "1", features = ["serde"] }
fixedbitset = { version = "0.5", features = ["std"] }
foyer = { workspace = true }
fs-err = "2"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
governor = { version = "0.6", default-features = false, features = ["std"] }
Expand Down
131 changes: 112 additions & 19 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::num::NonZeroUsize;
use anyhow::Context;
use clap::ValueEnum;
use educe::Educe;
use foyer::memory::{LfuConfig, LruConfig};
use risingwave_common_proc_macro::ConfigDoc;
pub use risingwave_common_proc_macro::OverrideConfig;
use risingwave_pb::meta::SystemParams;
Expand Down Expand Up @@ -567,6 +568,44 @@ impl PartialOrd for MetricLevel {
}
}

/// the section `[storage.cache]` in `risingwave.toml`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
pub struct CacheConfig {
/// Capacity of sstable block cache.
#[serde(default)]
pub block_cache_capacity_mb: Option<usize>,

/// Capacity of sstable meta cache.
#[serde(default)]
pub meta_cache_capacity_mb: Option<usize>,

#[serde(default)]
pub eviction: CacheEvictionConfig,
}

/// the section `[storage.cache.eviction]` in `risingwave.toml`.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "algorithm")]
pub enum CacheEvictionConfig {
Lru {
high_priority_ratio_in_percent: Option<usize>,
},
Lfu {
window_capacity_ratio_in_percent: Option<usize>,
protected_capacity_ratio_in_percent: Option<usize>,
cmsketch_eps: Option<f64>,
cmsketch_confidence: Option<f64>,
},
}

impl Default for CacheEvictionConfig {
fn default() -> Self {
Self::Lru {
high_priority_ratio_in_percent: None,
}
}
}

/// The section `[storage]` in `risingwave.toml`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde, ConfigDoc)]
pub struct StorageConfig {
Expand Down Expand Up @@ -597,16 +636,8 @@ pub struct StorageConfig {
#[serde(default = "default::storage::write_conflict_detection_enabled")]
pub write_conflict_detection_enabled: bool,

/// Capacity of sstable block cache.
#[serde(default)]
pub block_cache_capacity_mb: Option<usize>,

#[serde(default)]
pub high_priority_ratio_in_percent: Option<usize>,

/// Capacity of sstable meta cache.
#[serde(default)]
pub meta_cache_capacity_mb: Option<usize>,
pub cache: CacheConfig,

/// max memory usage for large query
#[serde(default)]
Expand Down Expand Up @@ -1215,6 +1246,19 @@ pub mod default {
70
}

pub fn window_capacity_ratio_in_percent() -> usize {
10
}
pub fn protected_capacity_ratio_in_percent() -> usize {
80
}
pub fn cmsketch_eps() -> f64 {
0.002
}
pub fn cmsketch_confidence() -> f64 {
0.95
}

pub fn meta_cache_capacity_mb() -> usize {
128
}
Expand Down Expand Up @@ -1642,6 +1686,20 @@ pub mod default {
}
}

#[derive(Debug, Clone)]
pub enum EvictionConfig {
Lru(LruConfig),
Lfu(LfuConfig),
}

impl EvictionConfig {
pub fn for_test() -> Self {
Self::Lru(LruConfig {
high_priority_pool_ratio: 0.0,
})
}
}

pub struct StorageMemoryConfig {
pub block_cache_capacity_mb: usize,
pub meta_cache_capacity_mb: usize,
Expand All @@ -1650,16 +1708,18 @@ pub struct StorageMemoryConfig {
pub meta_file_cache_ring_buffer_capacity_mb: usize,
pub compactor_memory_limit_mb: usize,
pub prefetch_buffer_capacity_mb: usize,
pub high_priority_ratio_in_percent: usize,
pub cache_eviction_config: EvictionConfig,
}

pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
let block_cache_capacity_mb = s
.storage
.cache
.block_cache_capacity_mb
.unwrap_or(default::storage::block_cache_capacity_mb());
let meta_cache_capacity_mb = s
.storage
.cache
.meta_cache_capacity_mb
.unwrap_or(default::storage::meta_cache_capacity_mb());
let shared_buffer_capacity_mb = s
Expand All @@ -1672,14 +1732,47 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
.storage
.compactor_memory_limit_mb
.unwrap_or(default::storage::compactor_memory_limit_mb());
let high_priority_ratio_in_percent = s
.storage
.high_priority_ratio_in_percent
.unwrap_or(default::storage::high_priority_ratio_in_percent());
let prefetch_buffer_capacity_mb = s
.storage
.shared_buffer_capacity_mb
.unwrap_or((100 - high_priority_ratio_in_percent) * block_cache_capacity_mb / 100);

let cache_eviction_config = match s.storage.cache.eviction {
CacheEvictionConfig::Lru {
high_priority_ratio_in_percent,
} => EvictionConfig::Lru(LruConfig {
high_priority_pool_ratio: high_priority_ratio_in_percent
.unwrap_or(default::storage::high_priority_ratio_in_percent())
as f64
/ 100.0,
}),
CacheEvictionConfig::Lfu {
window_capacity_ratio_in_percent,
protected_capacity_ratio_in_percent,
cmsketch_eps,
cmsketch_confidence,
} => EvictionConfig::Lfu(LfuConfig {
window_capacity_ratio: window_capacity_ratio_in_percent
.unwrap_or(default::storage::window_capacity_ratio_in_percent())
as f64
/ 100.0,
protected_capacity_ratio: protected_capacity_ratio_in_percent
.unwrap_or(default::storage::protected_capacity_ratio_in_percent())
as f64
/ 100.0,
cmsketch_eps: cmsketch_eps.unwrap_or(default::storage::cmsketch_eps()),
cmsketch_confidence: cmsketch_confidence
.unwrap_or(default::storage::cmsketch_confidence()),
}),
};

let prefetch_buffer_capacity_mb =
s.storage
.shared_buffer_capacity_mb
.unwrap_or(match &cache_eviction_config {
EvictionConfig::Lru(lru) => {
((1.0 - lru.high_priority_pool_ratio) * block_cache_capacity_mb as f64) as usize
}
EvictionConfig::Lfu(lfu) => {
((1.0 - lfu.protected_capacity_ratio) * block_cache_capacity_mb as f64) as usize
}
});

StorageMemoryConfig {
block_cache_capacity_mb,
Expand All @@ -1689,7 +1782,7 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
meta_file_cache_ring_buffer_capacity_mb,
compactor_memory_limit_mb,
prefetch_buffer_capacity_mb,
high_priority_ratio_in_percent,
cache_eviction_config,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ await-tree = { workspace = true }
chrono = { version = "0.4" }
clap = { version = "4", features = ["derive"] }
either = "1"
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
hyper = "0.14"
Expand Down
60 changes: 45 additions & 15 deletions src/compute/src/memory/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::config::{StorageConfig, StorageMemoryConfig};
use foyer::memory::{LfuConfig, LruConfig};
use risingwave_common::config::{
CacheEvictionConfig, EvictionConfig, StorageConfig, StorageMemoryConfig,
};
use risingwave_common::util::pretty_bytes::convert;

/// The minimal memory requirement of computing tasks in megabytes.
Expand All @@ -32,7 +35,6 @@ const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3;
const STORAGE_META_CACHE_MAX_MEMORY_MB: usize = 4096;
const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35;
const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3;
const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 50;

/// Each compute node reserves some memory for stack and code segment of processes, allocation
/// overhead, network buffer, etc. based on `SYSTEM_RESERVED_MEMORY_PROPORTION`. The reserve memory
Expand Down Expand Up @@ -65,30 +67,58 @@ pub fn storage_memory_config(
} else {
(STORAGE_MEMORY_PROPORTION + COMPACTOR_MEMORY_PROPORTION, 0.0)
};
let mut block_cache_capacity_mb = storage_config.block_cache_capacity_mb.unwrap_or(
let mut block_cache_capacity_mb = storage_config.cache.block_cache_capacity_mb.unwrap_or(
((non_reserved_memory_bytes as f64
* storage_memory_proportion
* STORAGE_BLOCK_CACHE_MEMORY_PROPORTION)
.ceil() as usize)
>> 20,
);
let high_priority_ratio_in_percent = storage_config
.high_priority_ratio_in_percent
.unwrap_or(STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO);
let default_meta_cache_capacity = (non_reserved_memory_bytes as f64
* storage_memory_proportion
* STORAGE_META_CACHE_MEMORY_PROPORTION)
.ceil() as usize;
let meta_cache_capacity_mb = storage_config
.meta_cache_capacity_mb
.unwrap_or(std::cmp::min(
default_meta_cache_capacity >> 20,
STORAGE_META_CACHE_MAX_MEMORY_MB,
));
let meta_cache_capacity_mb =
storage_config
.cache
.meta_cache_capacity_mb
.unwrap_or(std::cmp::min(
default_meta_cache_capacity >> 20,
STORAGE_META_CACHE_MAX_MEMORY_MB,
));

let prefetch_buffer_capacity_mb = storage_config
.prefetch_buffer_capacity_mb
.unwrap_or(block_cache_capacity_mb);
let cache_eviction_config = match &storage_config.cache.eviction {
CacheEvictionConfig::Lru {
high_priority_ratio_in_percent,
} => EvictionConfig::Lru(LruConfig {
high_priority_pool_ratio: high_priority_ratio_in_percent.unwrap_or(
risingwave_common::config::default::storage::high_priority_ratio_in_percent(),
) as f64
/ 100.0,
}),
CacheEvictionConfig::Lfu {
window_capacity_ratio_in_percent,
protected_capacity_ratio_in_percent,
cmsketch_eps,
cmsketch_confidence,
} => EvictionConfig::Lfu(LfuConfig {
window_capacity_ratio: window_capacity_ratio_in_percent.unwrap_or(
risingwave_common::config::default::storage::window_capacity_ratio_in_percent(),
) as f64
/ 100.0,
protected_capacity_ratio: protected_capacity_ratio_in_percent.unwrap_or(
risingwave_common::config::default::storage::protected_capacity_ratio_in_percent(),
) as f64
/ 100.0,
cmsketch_eps: cmsketch_eps
.unwrap_or(risingwave_common::config::default::storage::cmsketch_eps()),
cmsketch_confidence: cmsketch_confidence
.unwrap_or(risingwave_common::config::default::storage::cmsketch_confidence()),
}),
};

if meta_cache_capacity_mb == STORAGE_META_CACHE_MAX_MEMORY_MB {
block_cache_capacity_mb += (default_meta_cache_capacity >> 20) - meta_cache_capacity_mb;
Expand Down Expand Up @@ -143,7 +173,7 @@ pub fn storage_memory_config(
meta_file_cache_ring_buffer_capacity_mb,
compactor_memory_limit_mb,
prefetch_buffer_capacity_mb,
high_priority_ratio_in_percent,
cache_eviction_config,
}
}

Expand Down Expand Up @@ -192,8 +222,8 @@ mod tests {
assert_eq!(memory_config.meta_file_cache_ring_buffer_capacity_mb, 256);
assert_eq!(memory_config.compactor_memory_limit_mb, 819);

storage_config.block_cache_capacity_mb = Some(512);
storage_config.meta_cache_capacity_mb = Some(128);
storage_config.cache.block_cache_capacity_mb = Some(512);
storage_config.cache.meta_cache_capacity_mb = Some(128);
storage_config.shared_buffer_capacity_mb = Some(1024);
storage_config.compactor_memory_limit_mb = Some(512);
let memory_config = storage_memory_config(0, true, &storage_config);
Expand Down
Loading