diff --git a/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs b/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs index 860d211239bd..29119ae283b0 100644 --- a/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs +++ b/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs @@ -128,7 +128,8 @@ struct IntraCompactionTaskValidationRule { impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule { fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool { - if input.total_file_count >= self.config.level0_max_compact_file_number + if (input.total_file_count >= self.config.level0_max_compact_file_number + && input.input_levels.len() > 1) || input.input_levels.len() >= MAX_COMPACT_LEVEL_COUNT { return true; diff --git a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs index a7cea09ba70f..a6c4344532d7 100644 --- a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs @@ -348,10 +348,10 @@ impl WholeLevelCompactionPicker { let mut total_file_count = 0; let mut wait_enough = false; for next_level in l0.sub_levels.iter().skip(idx) { - if select_input_size > max_compaction_bytes + if (select_input_size > max_compaction_bytes || total_file_count > self.config.level0_max_compact_file_number - || (next_level.vnode_partition_count == partition_count - && select_level_inputs.len() > 1) + || next_level.vnode_partition_count == partition_count) + && select_level_inputs.len() > 1 { wait_enough = true; break; @@ -764,4 +764,32 @@ pub mod tests { assert!(is_l0_trivial_move(&ret)); assert_eq!(ret.input_levels[0].table_infos.len(), 1); } + #[test] + fn test_pick_whole_level() { + let config = Arc::new( + CompactionConfigBuilder::new() + .level0_max_compact_file_number(20) + .build(), + ); + let mut table_infos = vec![]; + for epoch in 1..3 { + let base = epoch * 100; + let mut ssts = vec![]; + for i in 1..50 { + let left = (i as usize) * 100; + let right = left + 100; + ssts.push(generate_table(base + i, 1, left, right, epoch)); + } + table_infos.push(ssts); + } + + let l0 = generate_l0_nonoverlapping_multi_sublevels(table_infos); + let compaction_task_validator = Arc::new(CompactionTaskValidator::new(config.clone())); + let picker = WholeLevelCompactionPicker::new(config, compaction_task_validator); + let level_handler = LevelHandler::new(0); + let ret = picker + .pick_whole_level(&l0, &level_handler, 4, &mut LocalPickerStatistic::default()) + .unwrap(); + assert_eq!(ret.input_levels.len(), 2); + } } diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 5665ef5bc997..85a6ee257aae 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -460,7 +460,12 @@ impl HummockManager { table_ids: &[StateTableId], ) -> Result { let result = self - .move_state_table_to_compaction_group(parent_group_id, table_ids, None, 0) + .move_state_table_to_compaction_group( + parent_group_id, + table_ids, + None, + self.env.opts.partition_vnode_count, + ) .await?; self.group_to_table_vnode_partition .write() diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index ca0c3ab8b0ab..94fbe569b650 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -15,6 +15,7 @@ use std::borrow::Borrow; use std::cmp::Ordering; use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; use itertools::Itertools; use prometheus::Registry; @@ -2188,3 +2189,115 @@ async fn test_gc_stats() { ); assert_eq_gc_stats(6, 3, 0, 0, 2, 4); } + +#[tokio::test] +async fn test_partition_level() { + let config = CompactionConfigBuilder::new() + .level0_tier_compact_file_number(3) + .level0_sub_level_compact_level_count(3) + .level0_overlapping_sub_level_compact_level_count(3) + .build(); + let registry = Registry::new(); + let (_env, hummock_manager, _, worker_node) = + setup_compute_env_with_metric(80, config.clone(), Some(MetaMetrics::for_test(®istry))) + .await; + let config = Arc::new(config); + + let context_id = worker_node.id; + + hummock_manager + .register_table_ids(&[(100, 2)]) + .await + .unwrap(); + hummock_manager + .register_table_ids(&[(101, 2)]) + .await + .unwrap(); + let sst_1 = gen_extend_sstable_info(10, 2, 1, vec![100, 101]); + hummock_manager + .commit_epoch( + 30, + CommitEpochInfo::for_test(vec![sst_1.clone()], HashMap::from([(10, context_id)])), + ) + .await + .unwrap(); + // Construct data via manual compaction + let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; + let base_level: usize = 6; + assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1); + assert_eq!(compaction_task.target_level, base_level as u32); + assert!(hummock_manager + .report_compact_task( + compaction_task.task_id, + TaskStatus::Success, + vec![ + gen_sstable_info(11, 1, vec![100]), + gen_sstable_info(12, 2, vec![101]), + ], + None, + ) + .await + .unwrap()); + + hummock_manager + .split_compaction_group(2, &[100]) + .await + .unwrap(); + let current_version = hummock_manager.get_current_version().await; + let new_group_id = current_version.levels.keys().max().cloned().unwrap(); + assert_eq!( + current_version + .get_compaction_group_levels(new_group_id) + .levels[base_level - 1] + .table_infos + .len(), + 1 + ); + let mut global_sst_id = 13; + const MB: u64 = 1024 * 1024; + let mut selector = default_compaction_selector(); + for epoch in 31..100 { + let mut sst = gen_extend_sstable_info(global_sst_id, new_group_id, 10, vec![100]); + sst.sst_info.file_size = 10 * MB; + sst.sst_info.uncompressed_file_size = 10 * MB; + hummock_manager + .commit_epoch( + epoch, + CommitEpochInfo::for_test(vec![sst], HashMap::from([(global_sst_id, context_id)])), + ) + .await + .unwrap(); + global_sst_id += 1; + if let Some(task) = hummock_manager + .get_compact_task(new_group_id, &mut selector) + .await + .unwrap() + { + let mut sst = gen_sstable_info(global_sst_id, 10, vec![100]); + sst.file_size = task + .input_ssts + .iter() + .map(|level| { + level + .table_infos + .iter() + .map(|sst| sst.file_size) + .sum::() + }) + .sum::(); + global_sst_id += 1; + let ret = hummock_manager + .report_compact_task(task.task_id, TaskStatus::Success, vec![sst], None) + .await + .unwrap(); + assert!(ret); + } + } + let current_version = hummock_manager.get_current_version().await; + let group = current_version.get_compaction_group_levels(new_group_id); + for sub_level in &group.l0.as_ref().unwrap().sub_levels { + if sub_level.total_file_size > config.sub_level_max_compaction_bytes { + assert!(sub_level.vnode_partition_count > 0); + } + } +}