Skip to content

Commit

Permalink
feat(storage): adapt to foyer new write model (#13059)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Nov 3, 2023
1 parent fe70f9e commit e71037d
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 134 deletions.
14 changes: 8 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 20 additions & 35 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,9 +634,6 @@ pub struct FileCacheConfig {
#[serde(default = "default::file_cache::file_capacity_mb")]
pub file_capacity_mb: usize,

#[serde(default)]
pub buffer_pool_size_mb: Option<usize>,

#[serde(default = "default::file_cache::device_align")]
pub device_align: usize,

Expand All @@ -661,17 +658,17 @@ pub struct FileCacheConfig {
#[serde(default = "default::file_cache::insert_rate_limit_mb")]
pub insert_rate_limit_mb: usize,

#[serde(default = "default::file_cache::flush_rate_limit_mb")]
pub flush_rate_limit_mb: usize,

#[serde(default = "default::file_cache::reclaim_rate_limit_mb")]
pub reclaim_rate_limit_mb: usize,

#[serde(default = "default::file_cache::allocation_bits")]
pub allocation_bits: usize,
#[serde(default = "default::file_cache::ring_buffer_capacity_mb")]
pub ring_buffer_capacity_mb: usize,

#[serde(default = "default::file_cache::catalog_bits")]
pub catalog_bits: usize,

#[serde(default = "default::file_cache::allocation_timeout_ms")]
pub allocation_timeout_ms: usize,
#[serde(default = "default::file_cache::compression")]
pub compression: String,

#[serde(default, flatten)]
pub unrecognized: Unrecognized<Self>,
Expand Down Expand Up @@ -1127,10 +1124,6 @@ pub mod default {
64
}

pub fn buffer_pool_size_mb() -> usize {
1024
}

pub fn device_align() -> usize {
4096
}
Expand Down Expand Up @@ -1163,20 +1156,20 @@ pub mod default {
0
}

pub fn flush_rate_limit_mb() -> usize {
pub fn reclaim_rate_limit_mb() -> usize {
0
}

pub fn reclaim_rate_limit_mb() -> usize {
0
pub fn ring_buffer_capacity_mb() -> usize {
256
}

pub fn allocation_bits() -> usize {
0
pub fn catalog_bits() -> usize {
6
}

pub fn allocation_timeout_ms() -> usize {
10
pub fn compression() -> String {
"none".to_string()
}
}

Expand Down Expand Up @@ -1391,8 +1384,8 @@ pub struct StorageMemoryConfig {
pub block_cache_capacity_mb: usize,
pub meta_cache_capacity_mb: usize,
pub shared_buffer_capacity_mb: usize,
pub data_file_cache_buffer_pool_capacity_mb: usize,
pub meta_file_cache_buffer_pool_capacity_mb: usize,
pub data_file_cache_ring_buffer_capacity_mb: usize,
pub meta_file_cache_ring_buffer_capacity_mb: usize,
pub compactor_memory_limit_mb: usize,
pub high_priority_ratio_in_percent: usize,
}
Expand All @@ -1410,16 +1403,8 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
.storage
.shared_buffer_capacity_mb
.unwrap_or(default::storage::shared_buffer_capacity_mb());
let data_file_cache_buffer_pool_size_mb = s
.storage
.data_file_cache
.buffer_pool_size_mb
.unwrap_or(default::file_cache::buffer_pool_size_mb());
let meta_file_cache_buffer_pool_size_mb = s
.storage
.meta_file_cache
.buffer_pool_size_mb
.unwrap_or(default::file_cache::buffer_pool_size_mb());
let data_file_cache_ring_buffer_capacity_mb = s.storage.data_file_cache.ring_buffer_capacity_mb;
let meta_file_cache_ring_buffer_capacity_mb = s.storage.meta_file_cache.ring_buffer_capacity_mb;
let compactor_memory_limit_mb = s
.storage
.compactor_memory_limit_mb
Expand All @@ -1433,8 +1418,8 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
block_cache_capacity_mb,
meta_cache_capacity_mb,
shared_buffer_capacity_mb,
data_file_cache_buffer_pool_capacity_mb: data_file_cache_buffer_pool_size_mb,
meta_file_cache_buffer_pool_capacity_mb: meta_file_cache_buffer_pool_size_mb,
data_file_cache_ring_buffer_capacity_mb,
meta_file_cache_ring_buffer_capacity_mb,
compactor_memory_limit_mb,
high_priority_ratio_in_percent,
}
Expand Down
55 changes: 15 additions & 40 deletions src/compute/src/memory_management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,39 +155,15 @@ pub fn storage_memory_config(
>> 20,
);

// `foyer` uses a buffer pool to manage dirty buffers.
//
// buffer size = region size (single file size with fs device)
//
// writing buffer + flushing buffer + free buffer = buffer pool buffers
//
// To utilize flushers and allocators, buffers should >= allocators + flushers.
//
// Adding more buffers can prevent allocators from waiting for buffers to be freed by flushers.

let data_file_cache_buffer_pool_capacity_mb = if storage_config.data_file_cache.dir.is_empty() {
let data_file_cache_ring_buffer_capacity_mb = if storage_config.data_file_cache.dir.is_empty() {
0
} else {
storage_config
.data_file_cache
.buffer_pool_size_mb
.unwrap_or(
storage_config.data_file_cache.file_capacity_mb
* (storage_config.data_file_cache.flushers
+ 2 * (1 << storage_config.data_file_cache.allocation_bits)),
)
storage_config.data_file_cache.ring_buffer_capacity_mb
};
let meta_file_cache_buffer_pool_capacity_mb = if storage_config.meta_file_cache.dir.is_empty() {
let meta_file_cache_ring_buffer_capacity_mb = if storage_config.meta_file_cache.dir.is_empty() {
0
} else {
storage_config
.meta_file_cache
.buffer_pool_size_mb
.unwrap_or(
storage_config.meta_file_cache.file_capacity_mb
* (storage_config.meta_file_cache.flushers
+ 2 * (1 << storage_config.meta_file_cache.allocation_bits)),
)
storage_config.meta_file_cache.ring_buffer_capacity_mb
};

let compactor_memory_limit_mb = storage_config.compactor_memory_limit_mb.unwrap_or(
Expand All @@ -197,8 +173,8 @@ pub fn storage_memory_config(
let total_calculated_mb = block_cache_capacity_mb
+ meta_cache_capacity_mb
+ shared_buffer_capacity_mb
+ data_file_cache_buffer_pool_capacity_mb
+ meta_file_cache_buffer_pool_capacity_mb
+ data_file_cache_ring_buffer_capacity_mb
+ meta_file_cache_ring_buffer_capacity_mb
+ compactor_memory_limit_mb;
let soft_limit_mb = (non_reserved_memory_bytes as f64
* (storage_memory_proportion + compactor_memory_proportion).ceil())
Expand All @@ -217,8 +193,8 @@ pub fn storage_memory_config(
block_cache_capacity_mb,
meta_cache_capacity_mb,
shared_buffer_capacity_mb,
data_file_cache_buffer_pool_capacity_mb,
meta_file_cache_buffer_pool_capacity_mb,
data_file_cache_ring_buffer_capacity_mb,
meta_file_cache_ring_buffer_capacity_mb,
compactor_memory_limit_mb,
high_priority_ratio_in_percent,
}
Expand Down Expand Up @@ -246,15 +222,16 @@ mod tests {
#[test]
fn test_storage_memory_config() {
let mut storage_config = StorageConfig::default();

let total_non_reserved_memory_bytes = 8 << 30;

let memory_config =
storage_memory_config(total_non_reserved_memory_bytes, true, &storage_config);
assert_eq!(memory_config.block_cache_capacity_mb, 737);
assert_eq!(memory_config.meta_cache_capacity_mb, 860);
assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
assert_eq!(memory_config.data_file_cache_buffer_pool_capacity_mb, 0);
assert_eq!(memory_config.meta_file_cache_buffer_pool_capacity_mb, 0);
assert_eq!(memory_config.data_file_cache_ring_buffer_capacity_mb, 0);
assert_eq!(memory_config.meta_file_cache_ring_buffer_capacity_mb, 0);
assert_eq!(memory_config.compactor_memory_limit_mb, 819);

storage_config.data_file_cache.dir = "data".to_string();
Expand All @@ -264,22 +241,20 @@ mod tests {
assert_eq!(memory_config.block_cache_capacity_mb, 737);
assert_eq!(memory_config.meta_cache_capacity_mb, 860);
assert_eq!(memory_config.shared_buffer_capacity_mb, 737);
assert_eq!(memory_config.data_file_cache_buffer_pool_capacity_mb, 384);
assert_eq!(memory_config.meta_file_cache_buffer_pool_capacity_mb, 384);
assert_eq!(memory_config.data_file_cache_ring_buffer_capacity_mb, 256);
assert_eq!(memory_config.meta_file_cache_ring_buffer_capacity_mb, 256);
assert_eq!(memory_config.compactor_memory_limit_mb, 819);

storage_config.block_cache_capacity_mb = Some(512);
storage_config.meta_cache_capacity_mb = Some(128);
storage_config.shared_buffer_capacity_mb = Some(1024);
storage_config.data_file_cache.buffer_pool_size_mb = Some(1024);
storage_config.meta_file_cache.buffer_pool_size_mb = Some(1024);
storage_config.compactor_memory_limit_mb = Some(512);
let memory_config = storage_memory_config(0, true, &storage_config);
assert_eq!(memory_config.block_cache_capacity_mb, 512);
assert_eq!(memory_config.meta_cache_capacity_mb, 128);
assert_eq!(memory_config.shared_buffer_capacity_mb, 1024);
assert_eq!(memory_config.data_file_cache_buffer_pool_capacity_mb, 1024);
assert_eq!(memory_config.meta_file_cache_buffer_pool_capacity_mb, 1024);
assert_eq!(memory_config.data_file_cache_ring_buffer_capacity_mb, 256);
assert_eq!(memory_config.meta_file_cache_ring_buffer_capacity_mb, 256);
assert_eq!(memory_config.compactor_memory_limit_mb, 512);
}
}
8 changes: 4 additions & 4 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,8 @@ fn total_storage_memory_limit_bytes(storage_memory_config: &StorageMemoryConfig)
let total_storage_memory_mb = storage_memory_config.block_cache_capacity_mb
+ storage_memory_config.meta_cache_capacity_mb
+ storage_memory_config.shared_buffer_capacity_mb
+ storage_memory_config.data_file_cache_buffer_pool_capacity_mb
+ storage_memory_config.meta_file_cache_buffer_pool_capacity_mb
+ storage_memory_config.data_file_cache_ring_buffer_capacity_mb
+ storage_memory_config.meta_file_cache_ring_buffer_capacity_mb
+ storage_memory_config.compactor_memory_limit_mb;
total_storage_memory_mb << 20
}
Expand Down Expand Up @@ -538,8 +538,8 @@ fn print_memory_config(
convert((storage_memory_config.block_cache_capacity_mb << 20) as _),
convert((storage_memory_config.meta_cache_capacity_mb << 20) as _),
convert((storage_memory_config.shared_buffer_capacity_mb << 20) as _),
convert((storage_memory_config.data_file_cache_buffer_pool_capacity_mb << 20) as _),
convert((storage_memory_config.meta_file_cache_buffer_pool_capacity_mb << 20) as _),
convert((storage_memory_config.data_file_cache_ring_buffer_capacity_mb << 20) as _),
convert((storage_memory_config.meta_file_cache_ring_buffer_capacity_mb << 20) as _),
if embedded_compactor_enabled {
convert((storage_memory_config.compactor_memory_limit_mb << 20) as _)
} else {
Expand Down
12 changes: 6 additions & 6 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ recover_concurrency = 8
lfu_window_to_cache_size_ratio = 1
lfu_tiny_lru_capacity_ratio = 0.01
insert_rate_limit_mb = 0
flush_rate_limit_mb = 0
reclaim_rate_limit_mb = 0
allocation_bits = 0
allocation_timeout_ms = 10
ring_buffer_capacity_mb = 256
catalog_bits = 6
compression = "none"

[storage.meta_file_cache]
dir = ""
Expand All @@ -141,10 +141,10 @@ recover_concurrency = 8
lfu_window_to_cache_size_ratio = 1
lfu_tiny_lru_capacity_ratio = 0.01
insert_rate_limit_mb = 0
flush_rate_limit_mb = 0
reclaim_rate_limit_mb = 0
allocation_bits = 0
allocation_timeout_ms = 10
ring_buffer_capacity_mb = 256
catalog_bits = 6
compression = "none"

[storage.cache_refill]
data_refill_levels = []
Expand Down
2 changes: 1 addition & 1 deletion src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ dyn-clone = "1.0.14"
either = "1"
enum-as-inner = "0.6"
fail = "0.5"
foyer = { git = "https://github.com/MrCroxx/foyer", rev = "2261151" }
foyer = { git = "https://github.com/MrCroxx/foyer", rev = "9232b3a" }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
hex = "0.4"
Expand Down
Loading

0 comments on commit e71037d

Please sign in to comment.