Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed May 16, 2024
1 parent 2d0fdcf commit fc6b2e0
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 39 deletions.
7 changes: 7 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,9 @@ pub struct StorageConfig {
#[serde(default = "default::storage::sstable_id_remote_fetch_number")]
pub sstable_id_remote_fetch_number: u32,

#[serde(default = "default::storage::min_sstable_size_mb")]
pub min_sstable_size_mb: u32,

#[serde(default)]
pub data_file_cache: FileCacheConfig,

Expand Down Expand Up @@ -1320,6 +1323,10 @@ pub mod default {
10
}

pub fn min_sstable_size_mb() -> u32 {
32
}

pub fn min_sst_size_for_streaming_upload() -> u64 {
// 32MB
32 * 1024 * 1024
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ This page is automatically generated by `./risedev generate-example-config`
| meta_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead. | |
| meta_file_cache | | |
| min_sst_size_for_streaming_upload | Whether to enable streaming upload for sstable. | 33554432 |
| min_sstable_size_mb | | 32 |
| object_store | | |
| prefetch_buffer_capacity_mb | max memory usage for large query | |
| share_buffer_compaction_worker_threads_number | Worker threads number of dedicated tokio runtime for share buffer compaction. 0 means use tokio's default value (number of CPU core). | 4 |
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ share_buffer_upload_concurrency = 8
compactor_max_task_multiplier = 3.0
compactor_memory_available_proportion = 0.8
sstable_id_remote_fetch_number = 10
min_sstable_size_mb = 32
min_sst_size_for_streaming_upload = 33554432
max_sub_compaction = 4
max_concurrent_compaction_task_number = 16
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,19 @@ impl CompactionTaskValidationRule for TierCompactionTaskValidationRule {
return true;
}

// so the design here wants to merge multiple overlapping-levels in one compaction
let max_compaction_bytes = std::cmp::min(
self.config.max_compaction_bytes,
self.config.sub_level_max_compaction_bytes
* self.config.level0_overlapping_sub_level_compact_level_count as u64,
);

// If waiting_enough_files is not satisfied, we will raise the priority of the number of
// levels to ensure that we can merge as many sub_levels as possible
let tier_sub_level_compact_level_count =
self.config.level0_overlapping_sub_level_compact_level_count as usize;
let min_sub_level_compact_level_count = tier_sub_level_compact_level_count / 2;
if input.input_levels.len() < min_sub_level_compact_level_count
|| (input.input_levels.len() < tier_sub_level_compact_level_count
&& input.select_input_size < self.config.max_bytes_for_level_base)
if input.input_levels.len() < tier_sub_level_compact_level_count
&& input.select_input_size < max_compaction_bytes
{
stats.skip_by_count_limit += 1;
return false;
Expand Down
32 changes: 22 additions & 10 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ impl HummockManager {
for input_ssts in &compact_task.input_ssts {
for sst in &input_ssts.table_infos {
existing_table_ids.extend(sst.table_ids.iter());
if sst.table_ids.len() > 0 {
*table_size_info.entry(sst.table_ids[0]).or_default() +=
for table_id in &sst.table_ids {
*table_size_info.entry(*table_id).or_default() +=
sst.file_size / (sst.table_ids.len() as u64);
}
}
Expand All @@ -296,30 +296,42 @@ impl HummockManager {

let hybrid_vnode_count = self.env.opts.hybird_partition_vnode_count;
let default_partition_count = self.env.opts.partition_vnode_count;
// We must ensure the partition threshold large enough to avoid too many small files.
let less_partition_threshold =
(hybrid_vnode_count as u64) * compaction_config.target_file_size_base;
let several_partition_threshold =
(default_partition_count as u64) * compaction_config.target_file_size_base;
let params = self.env.system_params_reader().await;
let barrier_interval_ms = params.barrier_interval_ms() as u64;
let checkpoint_secs = std::cmp::max(
1,
params.checkpoint_frequency() * barrier_interval_ms / 1000,
);
// check latest write throughput
let history_table_throughput = self.history_table_throughput.read();
for (table_id, compact_table_size) in table_size_info {
let write_throughput = history_table_throughput
.get(&table_id)
.map(|que| que.back().cloned().unwrap_or(0))
.unwrap_or(0)
/ checkpoint_secs;
if compact_table_size > compaction_config.max_compaction_bytes / 2 {
if compact_table_size > several_partition_threshold {
compact_task
.table_vnode_partition
.insert(table_id, default_partition_count);
} else if compact_table_size > compaction_config.sub_level_max_compaction_bytes
|| (compact_table_size > compaction_config.target_file_size_base
&& write_throughput > self.env.opts.table_write_throughput_threshold)
{
compact_task
.table_vnode_partition
.insert(table_id, hybrid_vnode_count);
} else {
if compact_table_size > less_partition_threshold
|| (write_throughput > self.env.opts.table_write_throughput_threshold
&& compact_table_size > compaction_config.target_file_size_base)
{
// partition for large write throughput table. But we also need to make sure that it can not be too small.
compact_task
.table_vnode_partition
.insert(table_id, hybrid_vnode_count);
} else if compact_table_size > compaction_config.target_file_size_base {
// partition for small table
compact_task.table_vnode_partition.insert(table_id, 1);
}
}
}
compact_task
Expand Down
35 changes: 19 additions & 16 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,14 @@ async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
});

let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
let (splits, sub_compaction_sstable_size, split_weight_by_vnode) =
let (splits, sub_compaction_sstable_size, table_vnode_partition) =
generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
let parallelism = splits.len();
let mut compact_success = true;
let mut output_ssts = Vec::with_capacity(parallelism);
let mut compaction_futures = vec![];
let use_block_based_filter = BlockedXor16FilterBuilder::is_kv_count_too_large(total_key_count);

let table_vnode_partition = if existing_table_ids.len() == 1 {
let table_id = existing_table_ids.iter().next().unwrap();
vec![(*table_id, split_weight_by_vnode)]
.into_iter()
.collect()
} else {
BTreeMap::default()
};
for (split_index, key_range) in splits.into_iter().enumerate() {
let compactor = SharedBufferCompactRunner::new(
split_index,
Expand Down Expand Up @@ -474,16 +466,20 @@ fn generate_splits(
payload: &UploadTaskPayload,
existing_table_ids: &HashSet<u32>,
storage_opts: &StorageOpts,
) -> (Vec<KeyRange>, u64, u32) {
) -> (Vec<KeyRange>, u64, BTreeMap<u32, u32>) {
let mut size_and_start_user_keys = vec![];
let mut compact_data_size = 0;
let mut table_size_infos: HashMap<u32, u64> = HashMap::default();
let mut table_vnode_partition = BTreeMap::default();
for imm in payload {
let data_size = {
// calculate encoded bytes of key var length
(imm.value_count() * EPOCH_LEN + imm.size()) as u64
};
compact_data_size += data_size;
size_and_start_user_keys.push((data_size, imm.start_user_key()));
let v = table_size_infos.entry(imm.table_id.table_id).or_insert(0);
*v += data_size;
}
size_and_start_user_keys.sort_by(|a, b| a.1.cmp(&b.1));
let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
Expand All @@ -493,6 +489,7 @@ fn generate_splits(
splits.push(KeyRange::new(key_before_last.clone(), Bytes::new()));
};
let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
let parallelism = std::cmp::min(
storage_opts.share_buffers_sync_parallelism as u64,
Expand All @@ -504,7 +501,6 @@ fn generate_splits(
compact_data_size
};

let mut vnode_partition_count = 0;
if existing_table_ids.len() > 1 {
if parallelism > 1 && compact_data_size > sstable_size {
let mut last_buffer_size = 0;
Expand All @@ -529,6 +525,13 @@ fn generate_splits(
}
}
}
for table_id in existing_table_ids {
if let Some(table_size) = table_size_infos.get(table_id)
&& *table_size > min_sstable_size
{
table_vnode_partition.insert(*table_id, 1);
}
}
} else {
// Collect vnodes in imm
let mut vnodes = vec![];
Expand All @@ -538,6 +541,7 @@ fn generate_splits(
vnodes.sort();
vnodes.dedup();

let mut vnode_partition_count = 0;
// Based on the estimated `vnode_avg_size`, calculate the required `vnode_partition_count` to avoid small files and further align
const MIN_SSTABLE_SIZE: u64 = 16 * 1024 * 1024;
if compact_data_size >= MIN_SSTABLE_SIZE && !vnodes.is_empty() {
Expand All @@ -547,17 +551,16 @@ fn generate_splits(
vnode_partition_count /= 2;
avg_vnode_size *= 2;
}
if let Some(table_id) = existing_table_ids.iter().next() {
table_vnode_partition.insert(*table_id, vnode_partition_count as u32);
}
}
}

// mul 1.2 for other extra memory usage.
// Ensure that the size of each sstable is still less than `sstable_size` after optimization to avoid generating a huge size sstable which will affect the object store
let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
(
splits,
sub_compaction_sstable_size,
vnode_partition_count as u32,
)
(splits, sub_compaction_sstable_size, table_vnode_partition)
}

pub struct SharedBufferCompactRunner {
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
pub struct SstableBuilderOptions {
/// Approximate sstable capacity.
pub capacity: usize,
pub min_sstable_size: usize,
/// Approximate block capacity.
pub block_capacity: usize,
/// Restart point interval.
Expand All @@ -59,8 +60,10 @@ pub struct SstableBuilderOptions {
impl From<&StorageOpts> for SstableBuilderOptions {
fn from(options: &StorageOpts) -> SstableBuilderOptions {
let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
let min_sstable_size: usize = (options.min_sstable_size_mb as usize) * (1 << 20);
SstableBuilderOptions {
capacity,
min_sstable_size,
block_capacity: (options.block_size_kb as usize) * (1 << 10),
restart_interval: DEFAULT_RESTART_INTERVAL,
bloom_false_positive: options.bloom_false_positive,
Expand All @@ -73,6 +76,7 @@ impl From<&StorageOpts> for SstableBuilderOptions {
impl Default for SstableBuilderOptions {
fn default() -> Self {
Self {
min_sstable_size: DEFAULT_SSTABLE_SIZE,
capacity: DEFAULT_SSTABLE_SIZE,
block_capacity: DEFAULT_BLOCK_SIZE,
restart_interval: DEFAULT_RESTART_INTERVAL,
Expand Down
9 changes: 0 additions & 9 deletions src/storage/src/hummock/sstable/multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,6 @@ where
// default
self.largest_vnode_in_current_partition = VirtualNode::MAX.to_index();
}
} else {
self.last_table_id = user_key.table_id.table_id;
self.split_weight_by_vnode = 0;
self.largest_vnode_in_current_partition = VirtualNode::MAX.to_index();
if let Some(builder) = self.current_builder.as_ref()
&& builder.approximate_len() > MIN_SST_SIZE
{
switch_builder = true;
}
}
}
if self.largest_vnode_in_current_partition != VirtualNode::MAX.to_index() {
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub struct StorageOpts {
pub parallel_compact_size_mb: u32,
/// Target size of the Sstable.
pub sstable_size_mb: u32,
/// Minimal target size of the Sstable to store data of different state-table in independent files as soon as possible.
pub min_sstable_size_mb: u32,
/// Size of each block in bytes in SST.
pub block_size_kb: u32,
/// False positive probability of bloom filter.
Expand Down Expand Up @@ -160,6 +162,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
Self {
parallel_compact_size_mb: p.parallel_compact_size_mb(),
sstable_size_mb: p.sstable_size_mb(),
min_sstable_size_mb: c.storage.min_sstable_size_mb,
block_size_kb: p.block_size_kb(),
bloom_false_positive: p.bloom_false_positive(),
share_buffers_sync_parallelism: c.storage.share_buffers_sync_parallelism,
Expand Down

0 comments on commit fc6b2e0

Please sign in to comment.