Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(config): support configure storage LRU-cache shard by default #15419

Merged
merged 5 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions src/common/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,31 +665,25 @@ pub struct LruCache<K: LruKey, T: LruValue> {
const DEFAULT_OBJECT_POOL_SIZE: usize = 1024;

impl<K: LruKey, T: LruValue> LruCache<K, T> {
pub fn new(num_shard_bits: usize, capacity: usize, high_priority_ratio: usize) -> Self {
Self::new_inner(num_shard_bits, capacity, high_priority_ratio, None)
pub fn new(num_shards: usize, capacity: usize, high_priority_ratio: usize) -> Self {
Self::new_inner(num_shards, capacity, high_priority_ratio, None)
}

pub fn with_event_listener(
num_shard_bits: usize,
num_shards: usize,
capacity: usize,
high_priority_ratio: usize,
listener: Arc<dyn LruCacheEventListener<K = K, T = T>>,
) -> Self {
Self::new_inner(
num_shard_bits,
capacity,
high_priority_ratio,
Some(listener),
)
Self::new_inner(num_shards, capacity, high_priority_ratio, Some(listener))
}

fn new_inner(
num_shard_bits: usize,
num_shards: usize,
capacity: usize,
high_priority_ratio: usize,
listener: Option<Arc<dyn LruCacheEventListener<K = K, T = T>>>,
) -> Self {
let num_shards = 1 << num_shard_bits;
let mut shards = Vec::with_capacity(num_shards);
let per_shard = capacity / num_shards;
let mut shard_usages = Vec::with_capacity(num_shards);
Expand Down Expand Up @@ -1055,7 +1049,7 @@ mod tests {

#[test]
fn test_cache_shard() {
let cache = Arc::new(LruCache::<(u64, u64), Block>::new(2, 256, 0));
let cache = Arc::new(LruCache::<(u64, u64), Block>::new(4, 256, 0));
assert_eq!(cache.shard(0), 0);
assert_eq!(cache.shard(1), 1);
assert_eq!(cache.shard(10), 2);
Expand Down Expand Up @@ -1355,7 +1349,7 @@ mod tests {

#[test]
fn test_write_request_pending() {
let cache = Arc::new(LruCache::new(0, 5, 0));
let cache = Arc::new(LruCache::new(1, 5, 0));
{
let mut shard = cache.shards[0].lock();
insert(&mut shard, "a", "v1");
Expand Down Expand Up @@ -1400,7 +1394,7 @@ mod tests {
#[test]
fn test_event_listener() {
let listener = Arc::new(TestLruCacheEventListener::default());
let cache = Arc::new(LruCache::with_event_listener(0, 2, 0, listener.clone()));
let cache = Arc::new(LruCache::with_event_listener(1, 2, 0, listener.clone()));

// full-fill cache
let h = cache.insert(
Expand Down Expand Up @@ -1495,7 +1489,7 @@ mod tests {

#[tokio::test]
async fn test_future_cancel() {
let cache: Arc<LruCache<u64, u64>> = Arc::new(LruCache::new(0, 5, 0));
let cache: Arc<LruCache<u64, u64>> = Arc::new(LruCache::new(1, 5, 0));
// do not need sender because this receiver will be cancelled.
let (_, recv) = channel::<()>();
let polled = Arc::new(AtomicBool::new(false));
Expand Down
29 changes: 29 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,13 +601,19 @@ pub struct StorageConfig {
#[serde(default)]
pub block_cache_capacity_mb: Option<usize>,

#[serde(default)]
pub block_shard_num: Option<usize>,
Li0k marked this conversation as resolved.
Show resolved Hide resolved

#[serde(default)]
pub high_priority_ratio_in_percent: Option<usize>,

/// Capacity of sstable meta cache.
#[serde(default)]
pub meta_cache_capacity_mb: Option<usize>,

#[serde(default)]
pub meta_shard_num: Option<usize>,

/// max memory usage for large query
#[serde(default)]
pub prefetch_buffer_capacity_mb: Option<usize>,
Expand Down Expand Up @@ -1656,7 +1662,9 @@ pub mod default {

pub struct StorageMemoryConfig {
pub block_cache_capacity_mb: usize,
pub block_shard_num: usize,
pub meta_cache_capacity_mb: usize,
pub meta_shard_num: usize,
pub shared_buffer_capacity_mb: usize,
pub data_file_cache_ring_buffer_capacity_mb: usize,
pub meta_file_cache_ring_buffer_capacity_mb: usize,
Expand All @@ -1665,6 +1673,10 @@ pub struct StorageMemoryConfig {
pub high_priority_ratio_in_percent: usize,
}

pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
pub const MAX_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict.

pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
let block_cache_capacity_mb = s
.storage
Expand All @@ -1678,6 +1690,21 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
.storage
.shared_buffer_capacity_mb
.unwrap_or(default::storage::shared_buffer_capacity_mb());
let meta_shard_num = s.storage.meta_shard_num.unwrap_or_else(|| {
Li0k marked this conversation as resolved.
Show resolved Hide resolved
let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 {
shard_bits -= 1;
}
shard_bits
});
let block_shard_num = s.storage.block_shard_num.unwrap_or_else(|| {
let mut shard_bits = MAX_CACHE_SHARD_BITS;
while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0
{
shard_bits -= 1;
}
shard_bits
});
let data_file_cache_ring_buffer_capacity_mb = s.storage.data_file_cache.ring_buffer_capacity_mb;
let meta_file_cache_ring_buffer_capacity_mb = s.storage.meta_file_cache.ring_buffer_capacity_mb;
let compactor_memory_limit_mb = s
Expand All @@ -1695,7 +1722,9 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {

StorageMemoryConfig {
block_cache_capacity_mb,
block_shard_num,
meta_cache_capacity_mb,
meta_shard_num,
shared_buffer_capacity_mb,
data_file_cache_ring_buffer_capacity_mb,
meta_file_cache_ring_buffer_capacity_mb,
Expand Down
79 changes: 58 additions & 21 deletions src/compute/src/memory/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::config::{StorageConfig, StorageMemoryConfig};
use risingwave_common::config::{
StorageConfig, StorageMemoryConfig, MAX_CACHE_SHARD_BITS, MAX_META_CACHE_SHARD_BITS,
MIN_BUFFER_SIZE_PER_SHARD,
};
use risingwave_common::util::pretty_bytes::convert;

/// The minimal memory requirement of computing tasks in megabytes.
Expand All @@ -30,6 +33,7 @@ const COMPACTOR_MEMORY_PROPORTION: f64 = 0.1;
const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3;

const STORAGE_META_CACHE_MAX_MEMORY_MB: usize = 4096;
const STORAGE_SHARED_BUFFER_MAX_MEMORY_MB: usize = 4096;
const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35;
const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3;
const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 50;
Expand Down Expand Up @@ -65,41 +69,56 @@ pub fn storage_memory_config(
} else {
(STORAGE_MEMORY_PROPORTION + COMPACTOR_MEMORY_PROPORTION, 0.0)
};
let mut block_cache_capacity_mb = storage_config.block_cache_capacity_mb.unwrap_or(
((non_reserved_memory_bytes as f64
* storage_memory_proportion
* STORAGE_BLOCK_CACHE_MEMORY_PROPORTION)
.ceil() as usize)
>> 20,
);
let mut default_block_cache_capacity_mb = ((non_reserved_memory_bytes as f64
* storage_memory_proportion
* STORAGE_BLOCK_CACHE_MEMORY_PROPORTION)
.ceil() as usize)
>> 20;
let high_priority_ratio_in_percent = storage_config
.high_priority_ratio_in_percent
.unwrap_or(STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO);
let default_meta_cache_capacity = (non_reserved_memory_bytes as f64
let default_meta_cache_capacity_mb = ((non_reserved_memory_bytes as f64
* storage_memory_proportion
* STORAGE_META_CACHE_MEMORY_PROPORTION)
.ceil() as usize;
.ceil() as usize)
>> 20;
let meta_cache_capacity_mb = storage_config
.meta_cache_capacity_mb
.unwrap_or(std::cmp::min(
default_meta_cache_capacity >> 20,
default_meta_cache_capacity_mb,
STORAGE_META_CACHE_MAX_MEMORY_MB,
));

let prefetch_buffer_capacity_mb = storage_config
.prefetch_buffer_capacity_mb
.unwrap_or(block_cache_capacity_mb);
.unwrap_or(default_block_cache_capacity_mb);

if meta_cache_capacity_mb == STORAGE_META_CACHE_MAX_MEMORY_MB {
block_cache_capacity_mb += (default_meta_cache_capacity >> 20) - meta_cache_capacity_mb;
if meta_cache_capacity_mb != default_meta_cache_capacity_mb {
default_block_cache_capacity_mb += default_meta_cache_capacity_mb;
default_block_cache_capacity_mb =
default_block_cache_capacity_mb.saturating_sub(meta_cache_capacity_mb);
}
let shared_buffer_capacity_mb = storage_config.shared_buffer_capacity_mb.unwrap_or(
((non_reserved_memory_bytes as f64
* storage_memory_proportion
* STORAGE_SHARED_BUFFER_MEMORY_PROPORTION)
.ceil() as usize)
>> 20,
);

let default_shared_buffer_capacity_mb = ((non_reserved_memory_bytes as f64
* storage_memory_proportion
* STORAGE_SHARED_BUFFER_MEMORY_PROPORTION)
.ceil() as usize)
>> 20;
let shared_buffer_capacity_mb =
storage_config
.shared_buffer_capacity_mb
.unwrap_or(std::cmp::min(
default_shared_buffer_capacity_mb,
STORAGE_SHARED_BUFFER_MAX_MEMORY_MB,
));
if shared_buffer_capacity_mb != default_shared_buffer_capacity_mb {
default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
default_block_cache_capacity_mb =
default_block_cache_capacity_mb.saturating_sub(shared_buffer_capacity_mb);
}
let block_cache_capacity_mb = storage_config
.block_cache_capacity_mb
.unwrap_or(default_block_cache_capacity_mb);

let data_file_cache_ring_buffer_capacity_mb = if storage_config.data_file_cache.dir.is_empty() {
0
Expand Down Expand Up @@ -135,9 +154,27 @@ pub fn storage_memory_config(
);
}

let meta_shard_num = storage_config.meta_shard_num.unwrap_or_else(|| {
Li0k marked this conversation as resolved.
Show resolved Hide resolved
let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 {
shard_bits -= 1;
}
1 << shard_bits
});
let block_shard_num = storage_config.block_shard_num.unwrap_or_else(|| {
let mut shard_bits = MAX_CACHE_SHARD_BITS;
while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0
{
shard_bits -= 1;
}
1 << shard_bits
});

StorageMemoryConfig {
block_cache_capacity_mb,
block_shard_num,
meta_cache_capacity_mb,
meta_shard_num,
shared_buffer_capacity_mb,
data_file_cache_ring_buffer_capacity_mb,
meta_file_cache_ring_buffer_capacity_mb,
Expand Down
2 changes: 2 additions & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ This page is automatically generated by `./risedev generate-example-config`
| Config | Description | Default |
|--------|-------------|---------|
| block_cache_capacity_mb | Capacity of sstable block cache. | |
| block_shard_num | | |
| cache_refill | | |
| check_compaction_result | | false |
| compact_iter_recreate_timeout_ms | | 600000 |
Expand All @@ -118,6 +119,7 @@ This page is automatically generated by `./risedev generate-example-config`
| mem_table_spill_threshold | The spill threshold for mem table. | 4194304 |
| meta_cache_capacity_mb | Capacity of sstable meta cache. | |
| meta_file_cache | | |
| meta_shard_num | | |
| min_sst_size_for_streaming_upload | Whether to enable streaming upload for sstable. | 33554432 |
| object_store | | |
| prefetch_buffer_capacity_mb | max memory usage for large query | |
Expand Down
2 changes: 2 additions & 0 deletions src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ impl HummockServiceOpts {
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
)),
block_shard_num: opts.block_shard_num,
meta_shard_num: opts.meta_shard_num,
})))
}
}
2 changes: 2 additions & 0 deletions src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ impl HummockJavaBindingIterator {
max_prefetch_block_number: 16,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
block_shard_num: 2,
meta_shard_num: 2,
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
Expand Down
2 changes: 2 additions & 0 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub fn mock_sstable_store() -> SstableStoreRef {
block_cache_capacity: 64 << 20,
meta_cache_capacity: 128 << 20,
high_priority_ratio: 0,
meta_shard_num: 2,
block_shard_num: 2,
prefetch_buffer_capacity: 64 << 20,
max_prefetch_block_number: 16,
data_file_cache: FileCache::none(),
Expand Down
2 changes: 2 additions & 0 deletions src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ fn bench_builder(
block_cache_capacity: 64 << 20,
meta_cache_capacity: 128 << 20,
high_priority_ratio: 0,
meta_shard_num: 2,
block_shard_num: 2,
prefetch_buffer_capacity: 64 << 20,
max_prefetch_block_number: 16,
data_file_cache: FileCache::none(),
Expand Down
2 changes: 2 additions & 0 deletions src/storage/hummock_test/src/bin/replay/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalRepl
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: state_store_metrics.clone(),
meta_shard_num: storage_opts.meta_shard_num,
block_shard_num: storage_opts.block_shard_num,
}));

let (hummock_meta_client, notification_client, notifier) = {
Expand Down
12 changes: 3 additions & 9 deletions src/storage/src/hummock/block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use crate::hummock::HummockError;
type CachedBlockEntry =
CacheEntry<(HummockSstableObjectId, u64), Box<Block>, BlockCacheEventListener>;

const MIN_BUFFER_SIZE_PER_SHARD: usize = 256 * 1024 * 1024;

enum BlockEntry {
Cache(#[allow(dead_code)] CachedBlockEntry),
Owned(#[allow(dead_code)] Box<Block>),
Expand Down Expand Up @@ -113,25 +111,21 @@ impl BlockCache {
// TODO(MrCroxx): support other cache algorithm
pub fn new(
capacity: usize,
mut max_shard_bits: usize,
block_shard_num: usize,
high_priority_ratio: usize,
event_listener: BlockCacheEventListener,
) -> Self {
if capacity == 0 {
panic!("block cache capacity == 0");
}
while (capacity >> max_shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && max_shard_bits > 0 {
max_shard_bits -= 1;
}
let shards = 1 << max_shard_bits;

let cache = Cache::lru(LruCacheConfig {
capacity,
shards,
shards: block_shard_num,
eviction_config: LruConfig {
high_priority_pool_ratio: high_priority_ratio as f64 / 100.0,
},
object_pool_capacity: shards * 1024,
object_pool_capacity: block_shard_num * 1024,
hash_builder: RandomState::default(),
event_listener,
});
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/hummock/iterator/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableSto
high_priority_ratio: 0,
prefetch_buffer_capacity: 64 << 20,
max_prefetch_block_number: 16,
meta_shard_num: 2,
block_shard_num: 2,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
recent_filter: None,
Expand Down
Loading
Loading