diff --git a/proto/hummock.proto b/proto/hummock.proto index 8d68ec168ef21..7caf27e155deb 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -382,6 +382,8 @@ message CompactTask { map table_watermarks = 24; // The table schemas that are at least as new as the one used to create `input_ssts`. map table_schemas = 25; + // Max sub compaction task numbers + uint32 max_sub_compaction = 26; } message LevelHandler { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index c8da0f6dce5e9..26e8bcaf1f56b 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -721,10 +721,6 @@ pub struct StorageConfig { #[serde(default = "default::storage::min_sst_size_for_streaming_upload")] pub min_sst_size_for_streaming_upload: u64, - /// Max sub compaction task numbers - #[serde(default = "default::storage::max_sub_compaction")] - pub max_sub_compaction: u32, - #[serde(default = "default::storage::max_concurrent_compaction_task_number")] pub max_concurrent_compaction_task_number: u64, @@ -1461,10 +1457,6 @@ pub mod default { 32 * 1024 * 1024 } - pub fn max_sub_compaction() -> u32 { - 4 - } - pub fn max_concurrent_compaction_task_number() -> u64 { 16 } diff --git a/src/config/docs.md b/src/config/docs.md index 018c9dd41087c..0a024ba992db0 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -121,7 +121,6 @@ This page is automatically generated by `./risedev generate-example-config` | max_prefetch_block_number | max prefetch block number | 16 | | max_preload_io_retry_times | | 3 | | max_preload_wait_time_mill | | 0 | -| max_sub_compaction | Max sub compaction task numbers | 4 | | max_version_pinning_duration_sec | | 10800 | | mem_table_spill_threshold | The spill threshold for mem table. | 4194304 | | meta_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead. | | diff --git a/src/config/example.toml b/src/config/example.toml index 00b1ef759e5f9..93546c7bdd238 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -136,7 +136,6 @@ compactor_memory_available_proportion = 0.8 sstable_id_remote_fetch_number = 10 min_sstable_size_mb = 32 min_sst_size_for_streaming_upload = 33554432 -max_sub_compaction = 4 max_concurrent_compaction_task_number = 16 max_preload_wait_time_mill = 0 max_version_pinning_duration_sec = 10800 diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index addb416893b08..16ca79a30962d 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -774,6 +774,7 @@ impl HummockManager { target_sub_level_id: compact_task.input.target_sub_level_id, task_type: compact_task.compaction_task_type as i32, split_weight_by_vnode: vnode_partition_count, + max_sub_compaction: group_config.compaction_config.max_sub_compaction, ..Default::default() }; diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index d0e5fe93c62ee..63b59366195f0 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -28,8 +28,7 @@ use risingwave_hummock_sdk::table_stats::TableStatsMap; use risingwave_hummock_sdk::{can_concat, EpochWithGap, KeyComparator}; use risingwave_pb::hummock::compact_task::TaskType; use risingwave_pb::hummock::{ - compact_task, BloomFilterType, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo, - TableSchema, + compact_task, BloomFilterType, CompactTask, LevelType, PbKeyRange, SstableInfo, TableSchema, }; use tokio::time::Instant; @@ -178,7 +177,8 @@ fn generate_splits_fast( sstable_infos: &Vec, compaction_size: u64, context: &CompactorContext, -) -> Vec { + max_sub_compaction: u32, +) -> Vec { let worker_num = context.compaction_executor.worker_num(); let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; @@ -186,7 +186,7 @@ fn generate_splits_fast( worker_num, parallel_compact_size, compaction_size, - context.storage_opts.max_sub_compaction, + max_sub_compaction, ); let mut indexes = vec![]; for sst in sstable_infos { @@ -213,13 +213,13 @@ fn generate_splits_fast( } let mut splits = vec![]; - splits.push(KeyRange_vec::new(vec![], vec![])); + splits.push(PbKeyRange::new(vec![], vec![])); let parallel_key_count = indexes.len() / parallelism; let mut last_split_key_count = 0; for key in indexes { if last_split_key_count >= parallel_key_count { splits.last_mut().unwrap().right.clone_from(&key); - splits.push(KeyRange_vec::new(key.clone(), vec![])); + splits.push(PbKeyRange::new(key.clone(), vec![])); last_split_key_count = 0; } last_split_key_count += 1; @@ -232,7 +232,8 @@ pub async fn generate_splits( sstable_infos: &Vec, compaction_size: u64, context: &CompactorContext, -) -> HummockResult> { + max_sub_compaction: u32, +) -> HummockResult> { const MAX_FILE_COUNT: usize = 32; let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; if compaction_size > parallel_compact_size { @@ -241,6 +242,7 @@ pub async fn generate_splits( sstable_infos, compaction_size, context, + max_sub_compaction, )); } let mut indexes = vec![]; @@ -269,13 +271,13 @@ pub async fn generate_splits( // sort by key, as for every data block has the same size; indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref())); let mut splits = vec![]; - splits.push(KeyRange_vec::new(vec![], vec![])); + splits.push(PbKeyRange::new(vec![], vec![])); let parallelism = calculate_task_parallelism_impl( context.compaction_executor.worker_num(), parallel_compact_size, compaction_size, - context.storage_opts.max_sub_compaction, + max_sub_compaction, ); let sub_compaction_data_size = @@ -291,7 +293,7 @@ pub async fn generate_splits( && remaining_size > parallel_compact_size { splits.last_mut().unwrap().right.clone_from(&key); - splits.push(KeyRange_vec::new(key.clone(), vec![])); + splits.push(PbKeyRange::new(key.clone(), vec![])); last_buffer_size = data_size; } else { last_buffer_size += data_size; @@ -577,7 +579,13 @@ pub async fn generate_splits_for_task( .sum::(); if !optimize_by_copy_block { - let splits = generate_splits(&sstable_infos, compaction_size, context).await?; + let splits = generate_splits( + &sstable_infos, + compaction_size, + context, + compact_task.get_max_sub_compaction(), + ) + .await?; if !splits.is_empty() { compact_task.splits = splits; } @@ -659,7 +667,7 @@ pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &Compacto context.compaction_executor.worker_num(), parallel_compact_size, compaction_size, - context.storage_opts.max_sub_compaction, + compact_task.get_max_sub_compaction(), ) } diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index aa4fd4cbb9630..5a7bca2c30b42 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -74,8 +74,6 @@ pub struct StorageOpts { pub sstable_id_remote_fetch_number: u32, /// Whether to enable streaming upload for sstable. pub min_sst_size_for_streaming_upload: u64, - /// Max sub compaction task numbers - pub max_sub_compaction: u32, pub max_concurrent_compaction_task_number: u64, pub max_version_pinning_duration_sec: u64, pub compactor_iter_max_io_retry_times: usize, @@ -176,7 +174,6 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt compactor_memory_limit_mb: s.compactor_memory_limit_mb, sstable_id_remote_fetch_number: c.storage.sstable_id_remote_fetch_number, min_sst_size_for_streaming_upload: c.storage.min_sst_size_for_streaming_upload, - max_sub_compaction: c.storage.max_sub_compaction, max_concurrent_compaction_task_number: c.storage.max_concurrent_compaction_task_number, max_version_pinning_duration_sec: c.storage.max_version_pinning_duration_sec, data_file_cache_dir: c.storage.data_file_cache.dir.clone(),