Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): Remove ambiguous configuration max_sub_compaction #16960

Merged
merged 4 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ message CompactTask {
map<uint32, TableWatermarks> table_watermarks = 24;
// The table schemas that are at least as new as the one used to create `input_ssts`.
map<uint32, TableSchema> table_schemas = 25;
// Max sub compaction task numbers
uint32 max_sub_compaction = 26;
}

message LevelHandler {
Expand Down
8 changes: 0 additions & 8 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,10 +717,6 @@ pub struct StorageConfig {
#[serde(default = "default::storage::min_sst_size_for_streaming_upload")]
pub min_sst_size_for_streaming_upload: u64,

/// Max sub compaction task numbers
#[serde(default = "default::storage::max_sub_compaction")]
pub max_sub_compaction: u32,

#[serde(default = "default::storage::max_concurrent_compaction_task_number")]
pub max_concurrent_compaction_task_number: u64,

Expand Down Expand Up @@ -1457,10 +1453,6 @@ pub mod default {
32 * 1024 * 1024
}

pub fn max_sub_compaction() -> u32 {
4
}

pub fn max_concurrent_compaction_task_number() -> u64 {
16
}
Expand Down
1 change: 0 additions & 1 deletion src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ This page is automatically generated by `./risedev generate-example-config`
| max_prefetch_block_number | max prefetch block number | 16 |
| max_preload_io_retry_times | | 3 |
| max_preload_wait_time_mill | | 0 |
| max_sub_compaction | Max sub compaction task numbers | 4 |
| max_version_pinning_duration_sec | | 10800 |
| mem_table_spill_threshold | The spill threshold for mem table. | 4194304 |
| meta_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead. | |
Expand Down
1 change: 0 additions & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ compactor_memory_available_proportion = 0.8
sstable_id_remote_fetch_number = 10
min_sstable_size_mb = 32
min_sst_size_for_streaming_upload = 33554432
max_sub_compaction = 4
max_concurrent_compaction_task_number = 16
max_preload_wait_time_mill = 0
max_version_pinning_duration_sec = 10800
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ impl HummockManager {
target_sub_level_id: compact_task.input.target_sub_level_id,
task_type: compact_task.compaction_task_type as i32,
split_weight_by_vnode: vnode_partition_count,
max_sub_compaction: group_config.compaction_config.max_sub_compaction,
..Default::default()
};

Expand Down
32 changes: 20 additions & 12 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use risingwave_hummock_sdk::table_stats::TableStatsMap;
use risingwave_hummock_sdk::{can_concat, EpochWithGap, KeyComparator};
use risingwave_pb::hummock::compact_task::TaskType;
use risingwave_pb::hummock::{
compact_task, BloomFilterType, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo,
TableSchema,
compact_task, BloomFilterType, CompactTask, LevelType, PbKeyRange, SstableInfo, TableSchema,
};
use tokio::time::Instant;

Expand Down Expand Up @@ -178,15 +177,16 @@ fn generate_splits_fast(
sstable_infos: &Vec<SstableInfo>,
compaction_size: u64,
context: &CompactorContext,
) -> Vec<KeyRange_vec> {
max_sub_compaction: u32,
) -> Vec<PbKeyRange> {
let worker_num = context.compaction_executor.worker_num();
let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;

let parallelism = calculate_task_parallelism_impl(
worker_num,
parallel_compact_size,
compaction_size,
context.storage_opts.max_sub_compaction,
max_sub_compaction,
);
let mut indexes = vec![];
for sst in sstable_infos {
Expand All @@ -213,13 +213,13 @@ fn generate_splits_fast(
}

let mut splits = vec![];
splits.push(KeyRange_vec::new(vec![], vec![]));
splits.push(PbKeyRange::new(vec![], vec![]));
let parallel_key_count = indexes.len() / parallelism;
let mut last_split_key_count = 0;
for key in indexes {
if last_split_key_count >= parallel_key_count {
splits.last_mut().unwrap().right.clone_from(&key);
splits.push(KeyRange_vec::new(key.clone(), vec![]));
splits.push(PbKeyRange::new(key.clone(), vec![]));
last_split_key_count = 0;
}
last_split_key_count += 1;
Expand All @@ -232,7 +232,8 @@ pub async fn generate_splits(
sstable_infos: &Vec<SstableInfo>,
compaction_size: u64,
context: &CompactorContext,
) -> HummockResult<Vec<KeyRange_vec>> {
max_sub_compaction: u32,
) -> HummockResult<Vec<PbKeyRange>> {
const MAX_FILE_COUNT: usize = 32;
let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
if compaction_size > parallel_compact_size {
Expand All @@ -241,6 +242,7 @@ pub async fn generate_splits(
sstable_infos,
compaction_size,
context,
max_sub_compaction,
));
}
let mut indexes = vec![];
Expand Down Expand Up @@ -269,13 +271,13 @@ pub async fn generate_splits(
// sort by key, as for every data block has the same size;
indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
let mut splits = vec![];
splits.push(KeyRange_vec::new(vec![], vec![]));
splits.push(PbKeyRange::new(vec![], vec![]));

let parallelism = calculate_task_parallelism_impl(
context.compaction_executor.worker_num(),
parallel_compact_size,
compaction_size,
context.storage_opts.max_sub_compaction,
max_sub_compaction,
);

let sub_compaction_data_size =
Expand All @@ -291,7 +293,7 @@ pub async fn generate_splits(
&& remaining_size > parallel_compact_size
{
splits.last_mut().unwrap().right.clone_from(&key);
splits.push(KeyRange_vec::new(key.clone(), vec![]));
splits.push(PbKeyRange::new(key.clone(), vec![]));
last_buffer_size = data_size;
} else {
last_buffer_size += data_size;
Expand Down Expand Up @@ -577,7 +579,13 @@ pub async fn generate_splits_for_task(
.sum::<u64>();

if !optimize_by_copy_block {
let splits = generate_splits(&sstable_infos, compaction_size, context).await?;
let splits = generate_splits(
&sstable_infos,
compaction_size,
context,
compact_task.get_max_sub_compaction(),
)
.await?;
if !splits.is_empty() {
compact_task.splits = splits;
}
Expand Down Expand Up @@ -659,7 +667,7 @@ pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &Compacto
context.compaction_executor.worker_num(),
parallel_compact_size,
compaction_size,
context.storage_opts.max_sub_compaction,
compact_task.get_max_sub_compaction(),
)
}

Expand Down
3 changes: 0 additions & 3 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ pub struct StorageOpts {
pub sstable_id_remote_fetch_number: u32,
/// Whether to enable streaming upload for sstable.
pub min_sst_size_for_streaming_upload: u64,
/// Max sub compaction task numbers
pub max_sub_compaction: u32,
pub max_concurrent_compaction_task_number: u64,
pub max_version_pinning_duration_sec: u64,
pub compactor_iter_max_io_retry_times: usize,
Expand Down Expand Up @@ -176,7 +174,6 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
compactor_memory_limit_mb: s.compactor_memory_limit_mb,
sstable_id_remote_fetch_number: c.storage.sstable_id_remote_fetch_number,
min_sst_size_for_streaming_upload: c.storage.min_sst_size_for_streaming_upload,
max_sub_compaction: c.storage.max_sub_compaction,
max_concurrent_compaction_task_number: c.storage.max_concurrent_compaction_task_number,
max_version_pinning_duration_sec: c.storage.max_version_pinning_duration_sec,
data_file_cache_dir: c.storage.data_file_cache.dir.clone(),
Expand Down
Loading