diff --git a/src/meta/src/hummock/compaction/overlap_strategy.rs b/src/meta/src/hummock/compaction/overlap_strategy.rs index 001b480b24255..1a713cc401175 100644 --- a/src/meta/src/hummock/compaction/overlap_strategy.rs +++ b/src/meta/src/hummock/compaction/overlap_strategy.rs @@ -95,15 +95,10 @@ impl OverlapInfo for RangeOverlapInfo { if overlap_begin >= others.len() { return overlap_begin..overlap_begin; } - let mut overlap_end = overlap_begin; - for table in &others[overlap_begin..] { - if key_range.compare_right_with(&table.key_range.as_ref().unwrap().left) - == cmp::Ordering::Less - { - break; - } - overlap_end += 1; - } + let overlap_end = others.partition_point(|table_status| { + key_range.compare_right_with(&table_status.key_range.as_ref().unwrap().left) + != cmp::Ordering::Less + }); overlap_begin..overlap_end } None => others.len()..others.len(), diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index 849cacd694113..6617b9496fe10 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -620,6 +620,7 @@ pub mod tests { ); let mut picker = LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default())); + let ret = picker .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); diff --git a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs index e34963ab48f9f..01c3c050e574c 100644 --- a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeSet; use std::sync::Arc; use risingwave_hummock_sdk::append_sstable_info_to_string; @@ -209,11 +208,15 @@ impl NonOverlapSubLevelPicker { } } + // Use the incoming sst as the basic range and select as many sub levels as possible + // 1. Build basic range based on sst + // 2. Add a new sub level in each round + // 3. Expand sst according to the basic range from new to old sub levels. + // 4. According to the size and count restrictions, select the plan that contains the most sub levels as much as possible fn pick_sub_level( &self, levels: &[Level], level_handler: &LevelHandler, - sst_index: usize, sst: &SstableInfo, ) -> SubLevelSstables { let mut ret = SubLevelSstables { @@ -222,127 +225,100 @@ impl NonOverlapSubLevelPicker { sstable_infos: vec![vec![]; levels.len()], }; ret.sstable_infos[0].extend(vec![sst.clone()]); - let mut overlap_info = self.overlap_strategy.create_overlap_info(); - let mut select_sst_id_set = BTreeSet::default(); - #[allow(clippy::single_range_in_vec_init)] - let mut overlap_len_and_begins = vec![(sst_index..(sst_index + 1))]; - for sst in &ret.sstable_infos[0] { - overlap_info.update(sst); - select_sst_id_set.insert(sst.sst_id); - } - for (target_index, target_level) in levels.iter().enumerate().skip(1) { + let mut pick_levels_range = Vec::default(); + let mut max_select_level_count = 0; + + // Pay attention to the order here: Make sure to select the lowest sub_level to meet the requirements of base compaction. If you break the assumption of this order, you need to redesign it. + // TODO: Use binary selection to replace the step algorithm to optimize algorithm complexity + 'expand_new_level: for (target_index, target_level) in levels.iter().enumerate().skip(1) { if target_level.level_type() != LevelType::Nonoverlapping { break; } - // more than 1 sub_level - if ret.total_file_count > 1 && ret.total_file_size >= self.max_compaction_bytes - || ret.total_file_count >= self.max_file_count as usize + if ret + .sstable_infos + .iter() + .filter(|ssts| !ssts.is_empty()) + .count() + > MAX_COMPACT_LEVEL_COUNT { break; } + // reset the `basic_overlap_info` with basic sst + let mut basic_overlap_info = self.overlap_strategy.create_overlap_info(); + basic_overlap_info.update(sst); + let mut overlap_files_range = - overlap_info.check_multiple_include(&target_level.table_infos); + basic_overlap_info.check_multiple_include(&target_level.table_infos); if overlap_files_range.is_empty() { overlap_files_range = - overlap_info.check_multiple_overlap(&target_level.table_infos); + basic_overlap_info.check_multiple_overlap(&target_level.table_infos); } - // We allow a layer in the middle without overlap, so we need to continue to - // the next layer to search for overlap - let mut pending_compact = false; - let mut current_level_size = 0; - for index in overlap_files_range.start..overlap_files_range.end { - let other = &target_level.table_infos[index]; - if level_handler.is_pending_compact(&other.sst_id) { - pending_compact = true; - break; - } - overlap_info.update(other); - select_sst_id_set.insert(other.sst_id); - current_level_size += other.file_size; + if overlap_files_range.is_empty() { + continue; } - if pending_compact { - break; - } + let mut overlap_levels = vec![]; - let mut extra_overlap_levels = vec![]; + let mut add_files_size: u64 = 0; + let mut add_files_count: usize = 0; - let mut add_files_size = 0; - // check reverse overlap - for (reverse_index, old_overlap_range) in - overlap_len_and_begins.iter_mut().enumerate().rev() - { + let mut select_level_count = 0; + for reverse_index in (0..=target_index).rev() { let target_tables = &levels[reverse_index].table_infos; - // It has select all files in this sub-level, so it can not overlap with more files. - if ret.sstable_infos[reverse_index].len() == target_tables.len() { + + overlap_files_range = if target_index == reverse_index { + overlap_files_range + } else { + basic_overlap_info.check_multiple_overlap(target_tables) + }; + + // We allow a layer in the middle without overlap, so we need to continue to + // the next layer to search for overlap + if overlap_files_range.is_empty() { + // empty level continue; } - let new_overlap_range = overlap_info.check_multiple_overlap(target_tables); - let mut extra_overlap_sst = Vec::with_capacity(new_overlap_range.len()); - for new_overlap_index in new_overlap_range.clone() { - if old_overlap_range.contains(&new_overlap_index) { - // Since some of the files have already been selected when selecting - // upwards, we filter here to avoid adding sst repeatedly - continue; - } - let other = &target_tables[new_overlap_index]; + for other in &target_tables[overlap_files_range.clone()] { if level_handler.is_pending_compact(&other.sst_id) { - pending_compact = true; - break; + break 'expand_new_level; } - debug_assert!(!select_sst_id_set.contains(&other.sst_id)); - add_files_size += other.file_size; - overlap_info.update(other); - select_sst_id_set.insert(other.sst_id); - extra_overlap_sst.push(other.clone()); + basic_overlap_info.update(other); + + add_files_size += other.get_file_size(); + add_files_count += 1; } - if pending_compact { - break; + // When size / file count has exceeded the limit, we need to abandon this plan, it cannot be expanded to the last sub_level + if max_select_level_count > 1 + && (add_files_size >= self.max_compaction_bytes + || add_files_count >= self.max_file_count as usize) + { + break 'expand_new_level; } - extra_overlap_levels.push((reverse_index, extra_overlap_sst)); - *old_overlap_range = new_overlap_range; + overlap_levels.push((reverse_index, overlap_files_range.clone())); + select_level_count += 1; } - // check reverse overlap - if pending_compact { - // encountering a pending file means we don't need to continue processing this - // interval - break; + if select_level_count > max_select_level_count { + max_select_level_count = select_level_count; + pick_levels_range = overlap_levels; } + } - let add_files_count = overlap_files_range.len() - + extra_overlap_levels - .iter() - .map(|(_, files)| files.len()) - .sum::(); - - if ret - .sstable_infos + for (reverse_index, sst_range) in pick_levels_range { + let level_ssts = &levels[reverse_index].table_infos; + ret.sstable_infos[reverse_index] = level_ssts[sst_range].to_vec(); + ret.total_file_count += ret.sstable_infos[reverse_index].len(); + ret.total_file_size += ret.sstable_infos[reverse_index] .iter() - .filter(|ssts| !ssts.is_empty()) - .count() - > MAX_COMPACT_LEVEL_COUNT - { - break; - } - - ret.total_file_count += add_files_count; - ret.total_file_size += add_files_size + current_level_size; - if !overlap_files_range.is_empty() { - ret.sstable_infos[target_index] - .extend_from_slice(&target_level.table_infos[overlap_files_range.clone()]); - } - overlap_len_and_begins.push(overlap_files_range); - for (reverse_index, files) in extra_overlap_levels { - ret.sstable_infos[reverse_index].extend(files); - } + .map(|sst| sst.file_size) + .sum::(); } // sort sst per level due to reverse expand @@ -355,76 +331,7 @@ impl NonOverlapSubLevelPicker { }); if self.enable_check_task_level_overlap { - use std::fmt::Write; - - use itertools::Itertools; - - use crate::hummock::compaction::overlap_strategy::OverlapInfo; - let mut overlap_info: Option> = None; - for (level_idx, ssts) in ret.sstable_infos.iter().enumerate().rev() { - if let Some(overlap_info) = overlap_info.as_mut() { - // skip the check if `overlap_info` is not initialized (i.e. the first non-empty level is not met) - let level = levels.get(level_idx).unwrap(); - let overlap_sst_range = overlap_info.check_multiple_overlap(&level.table_infos); - if !overlap_sst_range.is_empty() { - let expected_sst_ids = level.table_infos[overlap_sst_range.clone()] - .iter() - .map(|s| s.object_id) - .collect_vec(); - let actual_sst_ids = ssts.iter().map(|s| s.object_id).collect_vec(); - // `actual_sst_ids` can be larger than `expected_sst_ids` because we may use a larger key range to select SSTs. - // `expected_sst_ids` must be a sub-range of `actual_sst_ids` to ensure correctness. - let start_idx = actual_sst_ids - .iter() - .position(|sst_id| sst_id == expected_sst_ids.first().unwrap()); - if start_idx.map_or(true, |idx| { - actual_sst_ids[idx..idx + expected_sst_ids.len()] != expected_sst_ids - }) { - // Print SstableInfo for `actual_sst_ids` - let mut actual_sst_infos = String::new(); - ssts.iter().for_each(|s| { - append_sstable_info_to_string(&mut actual_sst_infos, s) - }); - - // Print SstableInfo for `expected_sst_ids` - let mut expected_sst_infos = String::new(); - level.table_infos[overlap_sst_range.clone()] - .iter() - .for_each(|s| { - append_sstable_info_to_string(&mut expected_sst_infos, s) - }); - - // Print SstableInfo for selected ssts in all sub-levels - let mut ret_sst_infos = String::new(); - ret.sstable_infos - .iter() - .enumerate() - .for_each(|(idx, ssts)| { - writeln!( - ret_sst_infos, - "sub level {}", - levels.get(idx).unwrap().sub_level_id - ) - .unwrap(); - ssts.iter().for_each(|s| { - append_sstable_info_to_string(&mut ret_sst_infos, s) - }); - }); - panic!( - "Compact task overlap check fails. Actual: {} Expected: {} Ret {}", - actual_sst_infos, expected_sst_infos, ret_sst_infos - ); - } - } - } else if !ssts.is_empty() { - // init the `overlap_info` when meeting the first non-empty level. - overlap_info = Some(self.overlap_strategy.create_overlap_info()); - } - - for sst in ssts { - overlap_info.as_mut().unwrap().update(sst); - } - } + self.verify_task_level_overlap(&ret, levels); } ret.sstable_infos.retain(|ssts| !ssts.is_empty()); @@ -441,13 +348,12 @@ impl NonOverlapSubLevelPicker { } let mut scores = vec![]; - let select_tables = &l0[0].table_infos; - for (sst_index, sst) in select_tables.iter().enumerate() { + for sst in &l0[0].table_infos { if level_handler.is_pending_compact(&sst.sst_id) { continue; } - let ret = self.pick_sub_level(l0, level_handler, sst_index, sst); + let ret = self.pick_sub_level(l0, level_handler, sst); if ret.sstable_infos.len() < self.min_depth && ret.total_file_size < self.min_compaction_bytes { @@ -493,10 +399,84 @@ impl NonOverlapSubLevelPicker { expected } + + fn verify_task_level_overlap(&self, ret: &SubLevelSstables, levels: &[Level]) { + use std::fmt::Write; + + use itertools::Itertools; + + use crate::hummock::compaction::overlap_strategy::OverlapInfo; + let mut overlap_info: Option> = None; + for (level_idx, ssts) in ret.sstable_infos.iter().enumerate().rev() { + if let Some(overlap_info) = overlap_info.as_mut() { + // skip the check if `overlap_info` is not initialized (i.e. the first non-empty level is not met) + let level = levels.get(level_idx).unwrap(); + let overlap_sst_range = overlap_info.check_multiple_overlap(&level.table_infos); + if !overlap_sst_range.is_empty() { + let expected_sst_ids = level.table_infos[overlap_sst_range.clone()] + .iter() + .map(|s| s.object_id) + .collect_vec(); + let actual_sst_ids = ssts.iter().map(|s| s.object_id).collect_vec(); + // `actual_sst_ids` can be larger than `expected_sst_ids` because we may use a larger key range to select SSTs. + // `expected_sst_ids` must be a sub-range of `actual_sst_ids` to ensure correctness. + let start_idx = actual_sst_ids + .iter() + .position(|sst_id| sst_id == expected_sst_ids.first().unwrap()); + if start_idx.map_or(true, |idx| { + actual_sst_ids[idx..idx + expected_sst_ids.len()] != expected_sst_ids + }) { + // Print SstableInfo for `actual_sst_ids` + let mut actual_sst_infos = String::new(); + ssts.iter() + .for_each(|s| append_sstable_info_to_string(&mut actual_sst_infos, s)); + + // Print SstableInfo for `expected_sst_ids` + let mut expected_sst_infos = String::new(); + level.table_infos[overlap_sst_range.clone()] + .iter() + .for_each(|s| { + append_sstable_info_to_string(&mut expected_sst_infos, s) + }); + + // Print SstableInfo for selected ssts in all sub-levels + let mut ret_sst_infos = String::new(); + ret.sstable_infos + .iter() + .enumerate() + .for_each(|(idx, ssts)| { + writeln!( + ret_sst_infos, + "sub level {}", + levels.get(idx).unwrap().sub_level_id + ) + .unwrap(); + ssts.iter().for_each(|s| { + append_sstable_info_to_string(&mut ret_sst_infos, s) + }); + }); + panic!( + "Compact task overlap check fails. Actual: {} Expected: {} Ret {}", + actual_sst_infos, expected_sst_infos, ret_sst_infos + ); + } + } + } else if !ssts.is_empty() { + // init the `overlap_info` when meeting the first non-empty level. + overlap_info = Some(self.overlap_strategy.create_overlap_info()); + } + + for sst in ssts { + overlap_info.as_mut().unwrap().update(sst); + } + } + } } #[cfg(test)] pub mod tests { + use std::collections::BTreeSet; + pub use risingwave_pb::hummock::{Level, LevelType}; use super::*; @@ -655,12 +635,12 @@ pub mod tests { level_type: LevelType::Nonoverlapping as i32, table_infos: vec![ generate_table(4, 1, 50, 199, 1), - generate_table(5, 1, 200, 399, 1), + generate_table(5, 1, 200, 249, 1), generate_table(9, 1, 250, 300, 2), generate_table(10, 1, 350, 400, 2), generate_table(11, 1, 450, 500, 2), ], - total_file_size: 250, + total_file_size: 350, ..Default::default() }, Level {