Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed May 16, 2024
1 parent 4c9f9fc commit 7a9cd17
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 26 deletions.
7 changes: 7 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,9 @@ pub struct StorageConfig {
#[serde(default = "default::storage::sstable_id_remote_fetch_number")]
pub sstable_id_remote_fetch_number: u32,

#[serde(default = "default::storage::min_sstable_size_mb")]
pub min_sstable_size_mb: u32,

#[serde(default)]
pub data_file_cache: FileCacheConfig,

Expand Down Expand Up @@ -1320,6 +1323,10 @@ pub mod default {
10
}

pub fn min_sstable_size_mb() -> u32 {
32
}

pub fn min_sst_size_for_streaming_upload() -> u64 {
// 32MB
32 * 1024 * 1024
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ This page is automatically generated by `./risedev generate-example-config`
| meta_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead. | |
| meta_file_cache | | |
| min_sst_size_for_streaming_upload | Whether to enable streaming upload for sstable. | 33554432 |
| min_sstable_size_mb | | 32 |
| object_store | | |
| prefetch_buffer_capacity_mb | max memory usage for large query | |
| share_buffer_compaction_worker_threads_number | Worker threads number of dedicated tokio runtime for share buffer compaction. 0 means use tokio's default value (number of CPU core). | 4 |
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ share_buffer_upload_concurrency = 8
compactor_max_task_multiplier = 3.0
compactor_memory_available_proportion = 0.8
sstable_id_remote_fetch_number = 10
min_sstable_size_mb = 32
min_sst_size_for_streaming_upload = 33554432
max_sub_compaction = 4
max_concurrent_compaction_task_number = 16
Expand Down
28 changes: 19 additions & 9 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ impl HummockManager {

let hybrid_vnode_count = self.env.opts.hybird_partition_vnode_count;
let default_partition_count = self.env.opts.partition_vnode_count;
// We must ensure the partition threshold large enough to avoid too many small files.
let less_partition_threshold =
(hybrid_vnode_count as u64) * compaction_config.target_file_size_base;
let several_partition_threshold =
(default_partition_count as u64) * compaction_config.target_file_size_base;
let params = self.env.system_params_reader().await;
let barrier_interval_ms = params.barrier_interval_ms() as u64;
let checkpoint_secs = std::cmp::max(
Expand All @@ -310,18 +315,23 @@ impl HummockManager {
.map(|que| que.back().cloned().unwrap_or(0))
.unwrap_or(0)
/ checkpoint_secs;
if compact_table_size > compaction_config.max_compaction_bytes / 2 {
if compact_table_size > several_partition_threshold {
compact_task
.table_vnode_partition
.insert(table_id, default_partition_count);
} else if compact_table_size > compaction_config.sub_level_max_compaction_bytes
|| (compact_table_size > compaction_config.target_file_size_base
&& write_throughput > self.env.opts.table_write_throughput_threshold)
{
// partition for large write throughput table.
compact_task
.table_vnode_partition
.insert(table_id, hybrid_vnode_count);
} else {
if compact_table_size > less_partition_threshold
|| (write_throughput > self.env.opts.table_write_throughput_threshold
&& compact_table_size > compaction_config.target_file_size_base)
{
// partition for large write throughput table. But we also need to make sure that it can not be too small.
compact_task
.table_vnode_partition
.insert(table_id, hybrid_vnode_count);
} else if compact_table_size > compaction_config.target_file_size_base {
// partition for small table
compact_task.table_vnode_partition.insert(table_id, 1);
}
}
}
compact_task
Expand Down
27 changes: 19 additions & 8 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
});

let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
let (splits, sub_compaction_sstable_size, split_weight_by_vnode) =
let (splits, sub_compaction_sstable_size, table_vnode_partition) =
generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
let parallelism = splits.len();
let mut compact_success = true;
Expand Down Expand Up @@ -474,16 +474,20 @@ fn generate_splits(
payload: &UploadTaskPayload,
existing_table_ids: &HashSet<u32>,
storage_opts: &StorageOpts,
) -> (Vec<KeyRange>, u64, u32) {
) -> (Vec<KeyRange>, u64, BTreeMap<u32, u32>) {
let mut size_and_start_user_keys = vec![];
let mut compact_data_size = 0;
let mut table_size_infos: HashMap<u32, u64> = HashMap::default();
let mut table_vnode_partition = BTreeMap::default();
for imm in payload {
let data_size = {
// calculate encoded bytes of key var length
(imm.value_count() * EPOCH_LEN + imm.size()) as u64
};
compact_data_size += data_size;
size_and_start_user_keys.push((data_size, imm.start_user_key()));
let v = table_size_infos.entry(imm.table_id.table_id).or_insert(0);
*v += data_size;
}
size_and_start_user_keys.sort_by(|a, b| a.1.cmp(&b.1));
let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
Expand All @@ -493,6 +497,7 @@ fn generate_splits(
splits.push(KeyRange::new(key_before_last.clone(), Bytes::new()));
};
let sstable_size = (storage_opts.sstable_size_mb as u64) << 20;
let min_sstable_size = (storage_opts.min_sstable_size_mb as u64) << 20;
let parallel_compact_size = (storage_opts.parallel_compact_size_mb as u64) << 20;
let parallelism = std::cmp::min(
storage_opts.share_buffers_sync_parallelism as u64,
Expand All @@ -504,7 +509,6 @@ fn generate_splits(
compact_data_size
};

let mut vnode_partition_count = 0;
if existing_table_ids.len() > 1 {
if parallelism > 1 && compact_data_size > sstable_size {
let mut last_buffer_size = 0;
Expand All @@ -529,6 +533,13 @@ fn generate_splits(
}
}
}
for table_id in existing_table_ids {
if let Some(table_size) = table_size_infos.get(table_id)
&& *table_size > min_sstable_size
{
table_vnode_partition.insert(*table_id, 1);
}
}
} else {
// Collect vnodes in imm
let mut vnodes = vec![];
Expand All @@ -538,6 +549,7 @@ fn generate_splits(
vnodes.sort();
vnodes.dedup();

let mut vnode_partition_count = 0;
// Based on the estimated `vnode_avg_size`, calculate the required `vnode_partition_count` to avoid small files and further align
const MIN_SSTABLE_SIZE: u64 = 16 * 1024 * 1024;
if compact_data_size >= MIN_SSTABLE_SIZE && !vnodes.is_empty() {
Expand All @@ -547,17 +559,16 @@ fn generate_splits(
vnode_partition_count /= 2;
avg_vnode_size *= 2;
}
if let Some(table_id) = existing_table_ids.iter().next() {
table_vnode_partition.insert(*table_id, vnode_partition_count);
}
}
}

// mul 1.2 for other extra memory usage.
// Ensure that the size of each sstable is still less than `sstable_size` after optimization to avoid generating a huge size sstable which will affect the object store
let sub_compaction_sstable_size = std::cmp::min(sstable_size, sub_compaction_data_size * 6 / 5);
(
splits,
sub_compaction_sstable_size,
vnode_partition_count as u32,
)
(splits, sub_compaction_sstable_size, table_vnode_partition)
}

pub struct SharedBufferCompactRunner {
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub const MIN_BLOCK_SIZE: usize = 8 * 1024;
pub struct SstableBuilderOptions {
/// Approximate sstable capacity.
pub capacity: usize,
pub min_sstable_size: usize,
/// Approximate block capacity.
pub block_capacity: usize,
/// Restart point interval.
Expand All @@ -59,8 +60,10 @@ pub struct SstableBuilderOptions {
impl From<&StorageOpts> for SstableBuilderOptions {
fn from(options: &StorageOpts) -> SstableBuilderOptions {
let capacity: usize = (options.sstable_size_mb as usize) * (1 << 20);
let min_sstable_size: usize = (options.min_sstable_size as usize) * (1 << 20);
SstableBuilderOptions {
capacity,
min_sstable_size,
block_capacity: (options.block_size_kb as usize) * (1 << 10),
restart_interval: DEFAULT_RESTART_INTERVAL,
bloom_false_positive: options.bloom_false_positive,
Expand Down
9 changes: 0 additions & 9 deletions src/storage/src/hummock/sstable/multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,6 @@ where
// default
self.largest_vnode_in_current_partition = VirtualNode::MAX.to_index();
}
} else {
self.last_table_id = user_key.table_id.table_id;
self.split_weight_by_vnode = 0;
self.largest_vnode_in_current_partition = VirtualNode::MAX.to_index();
if let Some(builder) = self.current_builder.as_ref()
&& builder.approximate_len() > MIN_SST_SIZE
{
switch_builder = true;
}
}
}
if self.largest_vnode_in_current_partition != VirtualNode::MAX.to_index() {
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub struct StorageOpts {
pub parallel_compact_size_mb: u32,
/// Target size of the Sstable.
pub sstable_size_mb: u32,
/// Minimal target size of the Sstable to store data of different state-table in independent files as soon as possible.
pub min_sstable_size_mb: u32,
/// Size of each block in bytes in SST.
pub block_size_kb: u32,
/// False positive probability of bloom filter.
Expand Down Expand Up @@ -160,6 +162,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
Self {
parallel_compact_size_mb: p.parallel_compact_size_mb(),
sstable_size_mb: p.sstable_size_mb(),
min_sstable_size_mb: c.storage.min_sstable_size_mb,
block_size_kb: p.block_size_kb(),
bloom_false_positive: p.bloom_false_positive(),
share_buffers_sync_parallelism: c.storage.share_buffers_sync_parallelism,
Expand Down

0 comments on commit 7a9cd17

Please sign in to comment.