Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(config): support configure storage LRU-cache shard by default #15419

Merged
merged 5 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions src/common/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,31 +665,25 @@ pub struct LruCache<K: LruKey, T: LruValue> {
const DEFAULT_OBJECT_POOL_SIZE: usize = 1024;

impl<K: LruKey, T: LruValue> LruCache<K, T> {
pub fn new(num_shard_bits: usize, capacity: usize, high_priority_ratio: usize) -> Self {
Self::new_inner(num_shard_bits, capacity, high_priority_ratio, None)
pub fn new(num_shards: usize, capacity: usize, high_priority_ratio: usize) -> Self {
Self::new_inner(num_shards, capacity, high_priority_ratio, None)
}

pub fn with_event_listener(
num_shard_bits: usize,
num_shards: usize,
capacity: usize,
high_priority_ratio: usize,
listener: Arc<dyn LruCacheEventListener<K = K, T = T>>,
) -> Self {
Self::new_inner(
num_shard_bits,
capacity,
high_priority_ratio,
Some(listener),
)
Self::new_inner(num_shards, capacity, high_priority_ratio, Some(listener))
}

fn new_inner(
num_shard_bits: usize,
num_shards: usize,
capacity: usize,
high_priority_ratio: usize,
listener: Option<Arc<dyn LruCacheEventListener<K = K, T = T>>>,
) -> Self {
let num_shards = 1 << num_shard_bits;
let mut shards = Vec::with_capacity(num_shards);
let per_shard = capacity / num_shards;
let mut shard_usages = Vec::with_capacity(num_shards);
Expand Down
29 changes: 29 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,13 +601,19 @@ pub struct StorageConfig {
#[serde(default)]
pub block_cache_capacity_mb: Option<usize>,

#[serde(default)]
pub block_shard_num: Option<usize>,
Li0k marked this conversation as resolved.
Show resolved Hide resolved

#[serde(default)]
pub high_priority_ratio_in_percent: Option<usize>,

/// Capacity of sstable meta cache.
#[serde(default)]
pub meta_cache_capacity_mb: Option<usize>,

#[serde(default)]
pub meta_shard_num: Option<usize>,

/// max memory usage for large query
#[serde(default)]
pub prefetch_buffer_capacity_mb: Option<usize>,
Expand Down Expand Up @@ -1656,7 +1662,9 @@ pub mod default {

pub struct StorageMemoryConfig {
pub block_cache_capacity_mb: usize,
pub block_shard_num: usize,
pub meta_cache_capacity_mb: usize,
pub meta_shard_num: usize,
pub shared_buffer_capacity_mb: usize,
pub data_file_cache_ring_buffer_capacity_mb: usize,
pub meta_file_cache_ring_buffer_capacity_mb: usize,
Expand All @@ -1665,6 +1673,10 @@ pub struct StorageMemoryConfig {
pub high_priority_ratio_in_percent: usize,
}

pub const MAX_META_CACHE_SHARD_BITS: usize = 4;
pub const MIN_BUFFER_SIZE_PER_SHARD: usize = 256;
pub const MAX_CACHE_SHARD_BITS: usize = 6; // It means that there will be 64 shards lru-cache to avoid lock conflict.

pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
let block_cache_capacity_mb = s
.storage
Expand All @@ -1678,6 +1690,21 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {
.storage
.shared_buffer_capacity_mb
.unwrap_or(default::storage::shared_buffer_capacity_mb());
let meta_shard_num = s.storage.meta_shard_num.unwrap_or_else(|| {
Li0k marked this conversation as resolved.
Show resolved Hide resolved
let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 {
shard_bits -= 1;
}
shard_bits
});
let block_shard_num = s.storage.block_shard_num.unwrap_or_else(|| {
let mut shard_bits = MAX_CACHE_SHARD_BITS;
while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0
{
shard_bits -= 1;
}
shard_bits
});
let data_file_cache_ring_buffer_capacity_mb = s.storage.data_file_cache.ring_buffer_capacity_mb;
let meta_file_cache_ring_buffer_capacity_mb = s.storage.meta_file_cache.ring_buffer_capacity_mb;
let compactor_memory_limit_mb = s
Expand All @@ -1695,7 +1722,9 @@ pub fn extract_storage_memory_config(s: &RwConfig) -> StorageMemoryConfig {

StorageMemoryConfig {
block_cache_capacity_mb,
block_shard_num,
meta_cache_capacity_mb,
meta_shard_num,
shared_buffer_capacity_mb,
data_file_cache_ring_buffer_capacity_mb,
meta_file_cache_ring_buffer_capacity_mb,
Expand Down
77 changes: 56 additions & 21 deletions src/compute/src/memory/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::config::{StorageConfig, StorageMemoryConfig};
use risingwave_common::config::{
StorageConfig, StorageMemoryConfig, MAX_CACHE_SHARD_BITS, MAX_META_CACHE_SHARD_BITS,
MIN_BUFFER_SIZE_PER_SHARD,
};
use risingwave_common::util::pretty_bytes::convert;

/// The minimal memory requirement of computing tasks in megabytes.
Expand All @@ -30,6 +33,7 @@ const COMPACTOR_MEMORY_PROPORTION: f64 = 0.1;
const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3;

const STORAGE_META_CACHE_MAX_MEMORY_MB: usize = 4096;
const STORAGE_SHARED_BUFFER_MAX_MEMORY_MB: usize = 1024;
const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35;
const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3;
const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 50;
Expand Down Expand Up @@ -65,41 +69,54 @@ pub fn storage_memory_config(
} else {
(STORAGE_MEMORY_PROPORTION + COMPACTOR_MEMORY_PROPORTION, 0.0)
};
let mut block_cache_capacity_mb = storage_config.block_cache_capacity_mb.unwrap_or(
((non_reserved_memory_bytes as f64
* storage_memory_proportion
* STORAGE_BLOCK_CACHE_MEMORY_PROPORTION)
.ceil() as usize)
>> 20,
);
let mut default_block_cache_capacity_mb = ((non_reserved_memory_bytes as f64
* storage_memory_proportion
* STORAGE_BLOCK_CACHE_MEMORY_PROPORTION)
.ceil() as usize)
>> 20;
let high_priority_ratio_in_percent = storage_config
.high_priority_ratio_in_percent
.unwrap_or(STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO);
let default_meta_cache_capacity = (non_reserved_memory_bytes as f64
let default_meta_cache_capacity_mb = ((non_reserved_memory_bytes as f64
* storage_memory_proportion
* STORAGE_META_CACHE_MEMORY_PROPORTION)
.ceil() as usize;
.ceil() as usize)
>> 20;
let meta_cache_capacity_mb = storage_config
.meta_cache_capacity_mb
.unwrap_or(std::cmp::min(
default_meta_cache_capacity >> 20,
default_meta_cache_capacity_mb,
STORAGE_META_CACHE_MAX_MEMORY_MB,
));

let prefetch_buffer_capacity_mb = storage_config
.prefetch_buffer_capacity_mb
.unwrap_or(block_cache_capacity_mb);
.unwrap_or(default_block_cache_capacity_mb);

if meta_cache_capacity_mb == STORAGE_META_CACHE_MAX_MEMORY_MB {
block_cache_capacity_mb += (default_meta_cache_capacity >> 20) - meta_cache_capacity_mb;
if meta_cache_capacity_mb != default_meta_cache_capacity_mb {
default_block_cache_capacity_mb += default_meta_cache_capacity_mb;
default_block_cache_capacity_mb -= meta_cache_capacity_mb;
}
let shared_buffer_capacity_mb = storage_config.shared_buffer_capacity_mb.unwrap_or(
((non_reserved_memory_bytes as f64
* storage_memory_proportion
* STORAGE_SHARED_BUFFER_MEMORY_PROPORTION)
.ceil() as usize)
>> 20,
);

let default_shared_buffer_capacity_mb = ((non_reserved_memory_bytes as f64
* storage_memory_proportion
* STORAGE_SHARED_BUFFER_MEMORY_PROPORTION)
.ceil() as usize)
>> 20;
let shared_buffer_capacity_mb =
storage_config
.shared_buffer_capacity_mb
.unwrap_or(std::cmp::min(
default_shared_buffer_capacity_mb,
STORAGE_SHARED_BUFFER_MAX_MEMORY_MB,
));
if shared_buffer_capacity_mb != default_shared_buffer_capacity_mb {
default_block_cache_capacity_mb += default_shared_buffer_capacity_mb;
default_block_cache_capacity_mb -= shared_buffer_capacity_mb;
}
let block_cache_capacity_mb = storage_config
.block_cache_capacity_mb
.unwrap_or(default_block_cache_capacity_mb);

let data_file_cache_ring_buffer_capacity_mb = if storage_config.data_file_cache.dir.is_empty() {
0
Expand Down Expand Up @@ -135,6 +152,22 @@ pub fn storage_memory_config(
);
}

let meta_shard_num = storage_config.meta_shard_num.unwrap_or_else(|| {
Li0k marked this conversation as resolved.
Show resolved Hide resolved
let mut shard_bits = MAX_META_CACHE_SHARD_BITS;
while (meta_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0 {
shard_bits -= 1;
}
1 << shard_bits
});
let block_shard_num = storage_config.block_shard_num.unwrap_or_else(|| {
let mut shard_bits = MAX_CACHE_SHARD_BITS;
while (block_cache_capacity_mb >> shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && shard_bits > 0
{
shard_bits -= 1;
}
1 << shard_bits
});

StorageMemoryConfig {
block_cache_capacity_mb,
meta_cache_capacity_mb,
Expand All @@ -144,6 +177,8 @@ pub fn storage_memory_config(
compactor_memory_limit_mb,
prefetch_buffer_capacity_mb,
high_priority_ratio_in_percent,
block_shard_num,
meta_shard_num,
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ impl HummockServiceOpts {
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
)),
block_shard_num: opts.block_shard_num,
meta_shard_num: opts.meta_shard_num,
})))
}
}
2 changes: 2 additions & 0 deletions src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ impl HummockJavaBindingIterator {
max_prefetch_block_number: 16,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
block_shard_num: 2,
meta_shard_num: 2,
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
Expand Down
2 changes: 2 additions & 0 deletions src/storage/hummock_test/src/bin/replay/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalRepl
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: state_store_metrics.clone(),
meta_shard_num: storage_opts.meta_shard_num,
block_shard_num: storage_opts.block_shard_num,
}));

let (hummock_meta_client, notification_client, notifier) = {
Expand Down
11 changes: 4 additions & 7 deletions src/storage/src/hummock/block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ use risingwave_hummock_sdk::HummockSstableObjectId;
use super::{Block, BlockCacheEventListener, HummockResult};
use crate::hummock::HummockError;

type CachedBlockEntry =
CacheEntry<(HummockSstableObjectId, u64), Box<Block>, BlockCacheEventListener>;

const MIN_BUFFER_SIZE_PER_SHARD: usize = 256 * 1024 * 1024;
type CachedBlockEntry = CacheableEntry<(HummockSstableObjectId, u64), Box<Block>>;

enum BlockEntry {
Cache(#[allow(dead_code)] CachedBlockEntry),
Expand Down Expand Up @@ -113,21 +110,21 @@ impl BlockCache {
// TODO(MrCroxx): support other cache algorithm
pub fn new(
capacity: usize,
mut max_shard_bits: usize,
block_shard_num: usize,
high_priority_ratio: usize,
event_listener: BlockCacheEventListener,
) -> Self {
if capacity == 0 {
panic!("block cache capacity == 0");
}

while (capacity >> max_shard_bits) < MIN_BUFFER_SIZE_PER_SHARD && max_shard_bits > 0 {
max_shard_bits -= 1;
}
let shards = 1 << max_shard_bits;

let cache = Cache::lru(LruCacheConfig {
capacity,
shards,
block_shard_num,
eviction_config: LruConfig {
high_priority_pool_ratio: high_priority_ratio as f64 / 100.0,
},
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/hummock/iterator/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableSto
high_priority_ratio: 0,
prefetch_buffer_capacity: 64 << 20,
max_prefetch_block_number: 16,
meta_shard_num: 2,
block_shard_num: 2,
data_file_cache: FileCache::none(),
meta_file_cache: FileCache::none(),
recent_filter: None,
Expand Down
14 changes: 5 additions & 9 deletions src/storage/src/hummock/sstable_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ pub struct SstableStoreConfig {
pub block_cache_capacity: usize,
pub meta_cache_capacity: usize,
pub high_priority_ratio: usize,
pub meta_shard_num: usize,
pub block_shard_num: usize,
pub prefetch_buffer_capacity: usize,
pub max_prefetch_block_number: usize,
pub data_file_cache: FileCache<SstableBlockIndex, CachedBlock>,
Expand Down Expand Up @@ -226,16 +228,10 @@ impl SstableStore {
pub fn new(config: SstableStoreConfig) -> Self {
// TODO: We should validate path early. Otherwise object store won't report invalid path
// error until first write attempt.
let mut meta_cache_shard_bits = MAX_META_CACHE_SHARD_BITS;
while (config.meta_cache_capacity >> meta_cache_shard_bits) < MIN_BUFFER_SIZE_PER_SHARD
&& meta_cache_shard_bits > 0
{
meta_cache_shard_bits -= 1;
}

let block_cache = BlockCache::new(
config.block_cache_capacity,
MAX_CACHE_SHARD_BITS,
config.block_shard_num,
config.high_priority_ratio,
BlockCacheEventListener::new(
config.data_file_cache.clone(),
Expand All @@ -245,7 +241,7 @@ impl SstableStore {
// TODO(MrCroxx): support other cache algorithm
let meta_cache = Arc::new(Cache::lru(LruCacheConfig {
capacity: config.meta_cache_capacity,
shards: 1 << meta_cache_shard_bits,
shards: config.meta_shard_num,
eviction_config: LruConfig {
high_priority_pool_ratio: 0.0,
},
Expand Down Expand Up @@ -293,7 +289,7 @@ impl SstableStore {
store,
block_cache: BlockCache::new(
block_cache_capacity,
0,
1,
0,
BlockCacheEventListener::new(
FileCache::none(),
Expand Down
6 changes: 6 additions & 0 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ pub struct StorageOpts {
pub write_conflict_detection_enabled: bool,
/// Capacity of sstable block cache.
pub block_cache_capacity_mb: usize,
/// the number of block-cache shard. Less shard means that more concurrent-conflict.
pub block_shard_num: usize,
/// Capacity of sstable meta cache.
pub meta_cache_capacity_mb: usize,
/// the number of meta-cache shard. Less shard means that more concurrent-conflict.
pub meta_shard_num: usize,
/// Percent of the ratio of high priority data in block-cache
pub high_priority_ratio: usize,
/// max memory usage for large query.
Expand Down Expand Up @@ -170,6 +174,8 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
prefetch_buffer_capacity_mb: s.prefetch_buffer_capacity_mb,
max_prefetch_block_number: c.storage.max_prefetch_block_number,
meta_cache_capacity_mb: s.meta_cache_capacity_mb,
block_shard_num: s.block_shard_num,
meta_shard_num: s.meta_shard_num,
disable_remote_compactor: c.storage.disable_remote_compactor,
share_buffer_upload_concurrency: c.storage.share_buffer_upload_concurrency,
compactor_memory_limit_mb: s.compactor_memory_limit_mb,
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,8 @@ impl StateStoreImpl {
block_cache_capacity: opts.block_cache_capacity_mb * (1 << 20),
meta_cache_capacity: opts.meta_cache_capacity_mb * (1 << 20),
high_priority_ratio: opts.high_priority_ratio,
meta_shard_num: opts.meta_shard_num,
block_shard_num: opts.block_shard_num,
prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
max_prefetch_block_number: opts.max_prefetch_block_number,
data_file_cache,
Expand Down
2 changes: 2 additions & 0 deletions src/tests/compaction_test/src/delete_range_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ async fn compaction_test(
meta_file_cache: FileCache::none(),
recent_filter: None,
state_store_metrics: state_store_metrics.clone(),
meta_shard_num: storage_memory_config.meta_shard_num,
block_shard_num: storage_memory_config.block_shard_num,
}));

let store = HummockStorage::new(
Expand Down
Loading