From ec6d374fa79d0da7d245d597838fbecba274786b Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 28 May 2024 14:23:14 +0800 Subject: [PATCH 1/2] fix(storage): Remove ambiguous configuration max_sub_compaction --- proto/hummock.proto | 2 ++ src/common/src/config.rs | 8 ----- src/meta/src/hummock/manager/compaction.rs | 1 + .../src/hummock/compactor/compaction_utils.rs | 32 ++++++++++++------- src/storage/src/opts.rs | 3 -- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 8d68ec168ef21..7caf27e155deb 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -382,6 +382,8 @@ message CompactTask { map table_watermarks = 24; // The table schemas that are at least as new as the one used to create `input_ssts`. map table_schemas = 25; + // Max sub compaction task numbers + uint32 max_sub_compaction = 26; } message LevelHandler { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index fd457c7faf1ca..6fc8d8aa02bfe 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -688,10 +688,6 @@ pub struct StorageConfig { #[serde(default = "default::storage::min_sst_size_for_streaming_upload")] pub min_sst_size_for_streaming_upload: u64, - /// Max sub compaction task numbers - #[serde(default = "default::storage::max_sub_compaction")] - pub max_sub_compaction: u32, - #[serde(default = "default::storage::max_concurrent_compaction_task_number")] pub max_concurrent_compaction_task_number: u64, @@ -1391,10 +1387,6 @@ pub mod default { 32 * 1024 * 1024 } - pub fn max_sub_compaction() -> u32 { - 4 - } - pub fn max_concurrent_compaction_task_number() -> u64 { 16 } diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index debd63828d743..259a86b6d5ecc 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -769,6 +769,7 @@ impl HummockManager { target_sub_level_id: compact_task.input.target_sub_level_id, task_type: compact_task.compaction_task_type as i32, split_weight_by_vnode: vnode_partition_count, + max_sub_compaction: group_config.compaction_config.max_sub_compaction, ..Default::default() }; diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index d0e5fe93c62ee..63b59366195f0 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -28,8 +28,7 @@ use risingwave_hummock_sdk::table_stats::TableStatsMap; use risingwave_hummock_sdk::{can_concat, EpochWithGap, KeyComparator}; use risingwave_pb::hummock::compact_task::TaskType; use risingwave_pb::hummock::{ - compact_task, BloomFilterType, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo, - TableSchema, + compact_task, BloomFilterType, CompactTask, LevelType, PbKeyRange, SstableInfo, TableSchema, }; use tokio::time::Instant; @@ -178,7 +177,8 @@ fn generate_splits_fast( sstable_infos: &Vec, compaction_size: u64, context: &CompactorContext, -) -> Vec { + max_sub_compaction: u32, +) -> Vec { let worker_num = context.compaction_executor.worker_num(); let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; @@ -186,7 +186,7 @@ fn generate_splits_fast( worker_num, parallel_compact_size, compaction_size, - context.storage_opts.max_sub_compaction, + max_sub_compaction, ); let mut indexes = vec![]; for sst in sstable_infos { @@ -213,13 +213,13 @@ fn generate_splits_fast( } let mut splits = vec![]; - splits.push(KeyRange_vec::new(vec![], vec![])); + splits.push(PbKeyRange::new(vec![], vec![])); let parallel_key_count = indexes.len() / parallelism; let mut last_split_key_count = 0; for key in indexes { if last_split_key_count >= parallel_key_count { splits.last_mut().unwrap().right.clone_from(&key); - splits.push(KeyRange_vec::new(key.clone(), vec![])); + splits.push(PbKeyRange::new(key.clone(), vec![])); last_split_key_count = 0; } last_split_key_count += 1; @@ -232,7 +232,8 @@ pub async fn generate_splits( sstable_infos: &Vec, compaction_size: u64, context: &CompactorContext, -) -> HummockResult> { + max_sub_compaction: u32, +) -> HummockResult> { const MAX_FILE_COUNT: usize = 32; let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; if compaction_size > parallel_compact_size { @@ -241,6 +242,7 @@ pub async fn generate_splits( sstable_infos, compaction_size, context, + max_sub_compaction, )); } let mut indexes = vec![]; @@ -269,13 +271,13 @@ pub async fn generate_splits( // sort by key, as for every data block has the same size; indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref())); let mut splits = vec![]; - splits.push(KeyRange_vec::new(vec![], vec![])); + splits.push(PbKeyRange::new(vec![], vec![])); let parallelism = calculate_task_parallelism_impl( context.compaction_executor.worker_num(), parallel_compact_size, compaction_size, - context.storage_opts.max_sub_compaction, + max_sub_compaction, ); let sub_compaction_data_size = @@ -291,7 +293,7 @@ pub async fn generate_splits( && remaining_size > parallel_compact_size { splits.last_mut().unwrap().right.clone_from(&key); - splits.push(KeyRange_vec::new(key.clone(), vec![])); + splits.push(PbKeyRange::new(key.clone(), vec![])); last_buffer_size = data_size; } else { last_buffer_size += data_size; @@ -577,7 +579,13 @@ pub async fn generate_splits_for_task( .sum::(); if !optimize_by_copy_block { - let splits = generate_splits(&sstable_infos, compaction_size, context).await?; + let splits = generate_splits( + &sstable_infos, + compaction_size, + context, + compact_task.get_max_sub_compaction(), + ) + .await?; if !splits.is_empty() { compact_task.splits = splits; } @@ -659,7 +667,7 @@ pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &Compacto context.compaction_executor.worker_num(), parallel_compact_size, compaction_size, - context.storage_opts.max_sub_compaction, + compact_task.get_max_sub_compaction(), ) } diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index 85d8d5c772a06..80a25dce71de6 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -72,8 +72,6 @@ pub struct StorageOpts { pub sstable_id_remote_fetch_number: u32, /// Whether to enable streaming upload for sstable. pub min_sst_size_for_streaming_upload: u64, - /// Max sub compaction task numbers - pub max_sub_compaction: u32, pub max_concurrent_compaction_task_number: u64, pub max_version_pinning_duration_sec: u64, pub compactor_iter_max_io_retry_times: usize, @@ -173,7 +171,6 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt compactor_memory_limit_mb: s.compactor_memory_limit_mb, sstable_id_remote_fetch_number: c.storage.sstable_id_remote_fetch_number, min_sst_size_for_streaming_upload: c.storage.min_sst_size_for_streaming_upload, - max_sub_compaction: c.storage.max_sub_compaction, max_concurrent_compaction_task_number: c.storage.max_concurrent_compaction_task_number, max_version_pinning_duration_sec: c.storage.max_version_pinning_duration_sec, data_file_cache_dir: c.storage.data_file_cache.dir.clone(), From 58860e27cdabaa4e6652731120b296b01ea1f267 Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 29 May 2024 10:59:07 +0800 Subject: [PATCH 2/2] fix(config): fix example.toml --- src/config/docs.md | 1 - src/config/example.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/src/config/docs.md b/src/config/docs.md index aea210f5235af..1134efdee85bd 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -119,7 +119,6 @@ This page is automatically generated by `./risedev generate-example-config` | max_prefetch_block_number | max prefetch block number | 16 | | max_preload_io_retry_times | | 3 | | max_preload_wait_time_mill | | 0 | -| max_sub_compaction | Max sub compaction task numbers | 4 | | max_version_pinning_duration_sec | | 10800 | | mem_table_spill_threshold | The spill threshold for mem table. | 4194304 | | meta_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.meta_cache_capacity_mb` instead. | | diff --git a/src/config/example.toml b/src/config/example.toml index fc70258788bbc..e626533194b19 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -133,7 +133,6 @@ compactor_memory_available_proportion = 0.8 sstable_id_remote_fetch_number = 10 min_sstable_size_mb = 32 min_sst_size_for_streaming_upload = 33554432 -max_sub_compaction = 4 max_concurrent_compaction_task_number = 16 max_preload_wait_time_mill = 0 max_version_pinning_duration_sec = 10800