diff --git a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs index c744c79c89995..980c3030a98fb 100644 --- a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs @@ -623,53 +623,4 @@ pub mod tests { assert!(is_l0_trivial_move(&ret)); assert_eq!(ret.input_levels[0].table_infos.len(), 1); } - - #[test] - fn test_issue_11154() { - let mut local_stats = LocalPickerStatistic::default(); - let mut l0 = generate_l0_overlapping_sublevels(vec![ - vec![ - generate_table(4, 1, 1, 200, 1), - generate_table(5, 1, 400, 600, 1), - ], - vec![ - generate_table(6, 1, 1, 200, 1), - generate_table(7, 1, 400, 600, 1), - ], - vec![ - generate_table(8, 1, 1, 200, 1), - generate_table(9, 1, 400, 600, 1), - ], - vec![generate_table(10, 1, 1, 600, 1)], - ]); - // We can set level_type only because the input above is valid. - for s in &mut l0.sub_levels { - s.level_type = LevelType::Nonoverlapping as i32; - } - let levels = Levels { - l0: Some(l0), - levels: vec![generate_level(1, vec![generate_table(3, 1, 0, 100000, 1)])], - member_table_ids: vec![1], - ..Default::default() - }; - let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; - - // Pick with large max_compaction_bytes results all sub levels included in input. - let config = Arc::new( - CompactionConfigBuilder::new() - .max_compaction_bytes(800) - .sub_level_max_compaction_bytes(50000) - .max_bytes_for_level_base(500000) - .level0_sub_level_compact_level_count(1) - .build(), - ); - // Only include sub-level 0 results will violate MAX_WRITE_AMPLIFICATION. - // So all sub-levels are included to make write amplification < MAX_WRITE_AMPLIFICATION. - let mut picker = IntraCompactionPicker::new(config); - let ret = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); - // avoid add sst_10 and cause a big task - assert_eq!(3, ret.input_levels.len()); - } } diff --git a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs index c705bd3b43aae..c17fa305be0e4 100644 --- a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs @@ -209,6 +209,13 @@ impl NonOverlapSubLevelPicker { break; } + // more than 1 sub_level + if ret.total_file_count > 1 && ret.total_file_size >= self.max_compaction_bytes + || ret.total_file_count >= self.max_file_count as usize + { + break; + } + let mut overlap_files_range = overlap_info.check_multiple_include(&target_level.table_infos); if overlap_files_range.is_empty() { @@ -288,15 +295,6 @@ impl NonOverlapSubLevelPicker { .map(|(_, files)| files.len()) .sum::(); - // more than 1 sub_level - if ret.total_file_count > 1 - && (ret.total_file_size + (add_files_size + current_level_size) - >= self.max_compaction_bytes - || ret.total_file_count + add_files_count >= self.max_file_count as usize) - { - break; - } - if ret .sstable_infos .iter() diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 5f28a076888ef..3c1332d09317c 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -158,6 +158,63 @@ pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompact multi_filter } +const MAX_FILE_COUNT: usize = 32; + +fn generate_splits_fast( + sstable_infos: &Vec, + compaction_size: u64, + context: CompactorContext, +) -> HummockResult> { + let worker_num = context.compaction_executor.worker_num(); + let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; + + let parallelism = (compaction_size + parallel_compact_size - 1) / parallel_compact_size; + + let parallelism = std::cmp::min( + worker_num, + std::cmp::min( + parallelism as usize, + context.storage_opts.max_sub_compaction as usize, + ), + ); + let mut indexes = vec![]; + for sst in sstable_infos { + let key_range = sst.key_range.as_ref().unwrap(); + indexes.push( + FullKey { + user_key: FullKey::decode(&key_range.left).user_key, + epoch: HummockEpoch::MAX, + } + .encode(), + ); + indexes.push( + FullKey { + user_key: FullKey::decode(&key_range.right).user_key, + epoch: HummockEpoch::MAX, + } + .encode(), + ); + } + indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref())); + indexes.dedup(); + if indexes.len() <= parallelism { + return Ok(vec![]); + } + let mut splits = vec![]; + splits.push(KeyRange_vec::new(vec![], vec![])); + let parallel_key_count = indexes.len() / parallelism; + let mut last_split_key_count = 0; + for key in indexes { + if last_split_key_count >= parallel_key_count { + splits.last_mut().unwrap().right = key.clone(); + splits.push(KeyRange_vec::new(key.clone(), vec![])); + last_split_key_count = 0; + } + last_split_key_count += 1; + } + Ok(splits) +} + pub async fn generate_splits( sstable_infos: &Vec, compaction_size: u64, @@ -165,6 +222,9 @@ pub async fn generate_splits( ) -> HummockResult> { let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; if compaction_size > parallel_compact_size { + if sstable_infos.len() > MAX_FILE_COUNT { + return generate_splits_fast(sstable_infos, compaction_size, context); + } let mut indexes = vec![]; // preload the meta and get the smallest key to split sub_compaction for sstable_info in sstable_infos { diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 3d079ce4a188e..583bab3d10b3c 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -152,17 +152,20 @@ impl CompactorRunner { let mut local_stats = StoreLocalStatistic::default(); for table_info in sstable_infos { - let table = sstable_store.sstable(table_info, &mut local_stats).await?; - let mut range_tombstone_list = table.value().meta.monotonic_tombstone_events.clone(); - range_tombstone_list.iter_mut().for_each(|tombstone| { - if filter.should_delete(FullKey::from_user_key( - tombstone.event_key.left_user_key.as_ref(), - tombstone.new_epoch, - )) { - tombstone.new_epoch = HummockEpoch::MAX; - } - }); - builder.add_delete_events(range_tombstone_list); + if table_info.range_tombstone_count > 0 { + let table = sstable_store.sstable(table_info, &mut local_stats).await?; + let mut range_tombstone_list = + table.value().meta.monotonic_tombstone_events.clone(); + range_tombstone_list.iter_mut().for_each(|tombstone| { + if filter.should_delete(FullKey::from_user_key( + tombstone.event_key.left_user_key.as_ref(), + tombstone.new_epoch, + )) { + tombstone.new_epoch = HummockEpoch::MAX; + } + }); + builder.add_delete_events(range_tombstone_list); + } } let aggregator = builder.build_for_compaction(); @@ -891,10 +894,8 @@ mod tests { use super::*; use crate::hummock::compactor::StateCleanUpCompactionFilter; use crate::hummock::iterator::test_utils::mock_sstable_store; - use crate::hummock::test_utils::{ - default_builder_opt_for_test, gen_test_sstable_with_range_tombstone, - }; - use crate::hummock::{create_monotonic_events, DeleteRangeTombstone}; + use crate::hummock::test_utils::{default_builder_opt_for_test, gen_test_sstable_impl}; + use crate::hummock::{create_monotonic_events, DeleteRangeTombstone, Xor16FilterBuilder}; #[tokio::test] async fn test_delete_range_aggregator_with_filter() { @@ -914,26 +915,26 @@ mod tests { 1, ), ]; - let mut sstable_info_1 = gen_test_sstable_with_range_tombstone( + let mut sstable_info_1 = gen_test_sstable_impl::( default_builder_opt_for_test(), 1, kv_pairs.clone().into_iter(), range_tombstones.clone(), sstable_store.clone(), + CachePolicy::NotFill, ) - .await - .get_sstable_info(); + .await; sstable_info_1.table_ids = vec![1]; - let mut sstable_info_2 = gen_test_sstable_with_range_tombstone( + let mut sstable_info_2 = gen_test_sstable_impl::( default_builder_opt_for_test(), 2, kv_pairs.into_iter(), range_tombstones.clone(), sstable_store.clone(), + CachePolicy::NotFill, ) - .await - .get_sstable_info(); + .await; sstable_info_2.table_ids = vec![2]; let compact_task = CompactTask {