diff --git a/src/meta/src/hummock/compaction/picker/emergency_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/emergency_compaction_picker.rs index c6380ef476cc..4efcca28a981 100644 --- a/src/meta/src/hummock/compaction/picker/emergency_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/emergency_compaction_picker.rs @@ -15,12 +15,13 @@ use std::sync::Arc; use risingwave_pb::hummock::hummock_version::Levels; -use risingwave_pb::hummock::CompactionConfig; +use risingwave_pb::hummock::{CompactionConfig, LevelType}; use super::{ CompactionInput, CompactionPicker, CompactionTaskValidator, LevelCompactionPicker, LocalPickerStatistic, TierCompactionPicker, }; +use crate::hummock::compaction::picker::intra_compaction_picker::WholeLevelCompactionPicker; use crate::hummock::compaction::CompactionDeveloperConfig; use crate::hummock::level_handler::LevelHandler; @@ -50,20 +51,65 @@ impl EmergencyCompactionPicker { stats: &mut LocalPickerStatistic, ) -> Option { let unused_validator = Arc::new(CompactionTaskValidator::unused()); - - let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator( - self.target_level, - self.config.clone(), - unused_validator.clone(), - self.developer_config.clone(), - ); - - if let Some(ret) = - base_level_compaction_picker.pick_compaction(levels, level_handlers, stats) + let l0 = levels.l0.as_ref().unwrap(); + let overlapping_count = l0 + .sub_levels + .iter() + .filter(|level| level.level_type == LevelType::Overlapping as i32) + .count(); + let no_overlap_count = l0 + .sub_levels + .iter() + .filter(|level| { + level.level_type == LevelType::Nonoverlapping as i32 + && level.vnode_partition_count == 0 + }) + .count(); + let partitioned_count = l0 + .sub_levels + .iter() + .filter(|level| { + level.level_type == LevelType::Nonoverlapping as i32 + && level.vnode_partition_count > 0 + }) + .count(); + // We trigger `EmergencyCompactionPicker` only when some unexpected condition cause the number of l0 levels increase and the origin strategy + // can not compact those data to lower level. But if most of these levels are overlapping level, it is dangerous to compact small data of non-overlapping sub level + // to base level, it will cost a lot of compactor resource because of large write-amplification. + if (self.config.split_weight_by_vnode == 0 && no_overlap_count > overlapping_count) + || (self.config.split_weight_by_vnode > 0 + && partitioned_count > no_overlap_count + && partitioned_count > overlapping_count) { - return Some(ret); + let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator( + self.target_level, + self.config.clone(), + unused_validator.clone(), + self.developer_config.clone(), + ); + + if let Some(ret) = + base_level_compaction_picker.pick_compaction(levels, level_handlers, stats) + { + return Some(ret); + } } + if self.config.split_weight_by_vnode > 0 + && no_overlap_count > partitioned_count + && no_overlap_count > overlapping_count + { + let intral_level_compaction_picker = + WholeLevelCompactionPicker::new(self.config.clone(), unused_validator.clone()); + if let Some(ret) = intral_level_compaction_picker.pick_whole_level( + levels.l0.as_ref().unwrap(), + &level_handlers[0], + self.config.split_weight_by_vnode, + stats, + ) { + return Some(ret); + } + } let mut tier_compaction_picker = TierCompactionPicker::new_with_validator(self.config.clone(), unused_validator); diff --git a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs index c61b51ff1567..15cf7e9d01f1 100644 --- a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs @@ -108,78 +108,11 @@ impl IntraCompactionPicker { partition_count: u32, stats: &mut LocalPickerStatistic, ) -> Option { - if partition_count == 0 { - return None; - } - for (idx, level) in l0.sub_levels.iter().enumerate() { - if level.level_type() != LevelType::Nonoverlapping - || level.vnode_partition_count == partition_count - { - continue; - } - - let max_compaction_bytes = std::cmp::max( - self.config.max_bytes_for_level_base, - self.config.sub_level_max_compaction_bytes - * (self.config.level0_sub_level_compact_level_count as u64), - ); - - let mut select_input_size = 0; - - let mut select_level_inputs = vec![]; - let mut total_file_count = 0; - let mut wait_enough = false; - for next_level in l0.sub_levels.iter().skip(idx) { - if select_input_size > max_compaction_bytes - || total_file_count > self.config.level0_max_compact_file_number - || (next_level.vnode_partition_count == partition_count - && select_level_inputs.len() > 1) - { - wait_enough = true; - break; - } - - if level_handler.is_level_pending_compact(next_level) { - break; - } - - select_input_size += next_level.total_file_size; - total_file_count += next_level.table_infos.len() as u64; - - select_level_inputs.push(InputLevel { - level_idx: 0, - level_type: next_level.level_type, - table_infos: next_level.table_infos.clone(), - }); - } - if !select_level_inputs.is_empty() { - let vnode_partition_count = - if select_input_size > self.config.sub_level_max_compaction_bytes / 2 { - partition_count - } else { - 0 - }; - let result = CompactionInput { - input_levels: select_level_inputs, - target_sub_level_id: level.sub_level_id, - select_input_size, - total_file_count, - vnode_partition_count, - ..Default::default() - }; - if wait_enough - || self.compaction_task_validator.valid_compact_task( - &result, - ValidationRuleType::Intra, - stats, - ) - { - return Some(result); - } - } - } - - None + let picker = WholeLevelCompactionPicker::new( + self.config.clone(), + self.compaction_task_validator.clone(), + ); + picker.pick_whole_level(l0, level_handler, partition_count, stats) } fn pick_l0_intra( @@ -378,6 +311,104 @@ impl IntraCompactionPicker { } } +pub struct WholeLevelCompactionPicker { + config: Arc, + compaction_task_validator: Arc, +} + +impl WholeLevelCompactionPicker { + pub fn new( + config: Arc, + compaction_task_validator: Arc, + ) -> Self { + Self { + config, + compaction_task_validator, + } + } + + pub fn pick_whole_level( + &self, + l0: &OverlappingLevel, + level_handler: &LevelHandler, + partition_count: u32, + stats: &mut LocalPickerStatistic, + ) -> Option { + if partition_count == 0 { + return None; + } + for (idx, level) in l0.sub_levels.iter().enumerate() { + if level.level_type() != LevelType::Nonoverlapping + || level.vnode_partition_count == partition_count + { + continue; + } + + let max_compaction_bytes = std::cmp::max( + self.config.max_bytes_for_level_base, + self.config.sub_level_max_compaction_bytes + * (self.config.level0_sub_level_compact_level_count as u64), + ); + + let mut select_input_size = 0; + + let mut select_level_inputs = vec![]; + let mut total_file_count = 0; + let mut wait_enough = false; + for next_level in l0.sub_levels.iter().skip(idx) { + if select_input_size > max_compaction_bytes + || total_file_count > self.config.level0_max_compact_file_number + || (next_level.vnode_partition_count == partition_count + && select_level_inputs.len() > 1) + { + wait_enough = true; + break; + } + + if level_handler.is_level_pending_compact(next_level) { + break; + } + + select_input_size += next_level.total_file_size; + total_file_count += next_level.table_infos.len() as u64; + + select_level_inputs.push(InputLevel { + level_idx: 0, + level_type: next_level.level_type, + table_infos: next_level.table_infos.clone(), + }); + } + if !select_level_inputs.is_empty() { + let vnode_partition_count = + if select_input_size > self.config.sub_level_max_compaction_bytes / 2 { + partition_count + } else { + 0 + }; + let result = CompactionInput { + input_levels: select_level_inputs, + target_sub_level_id: level.sub_level_id, + select_input_size, + total_file_count, + vnode_partition_count, + ..Default::default() + }; + if wait_enough + || self.compaction_task_validator.valid_compact_task( + &result, + ValidationRuleType::Intra, + stats, + ) + { + return Some(result); + } + } + } + + None + } +} + #[cfg(test)] pub mod tests { use risingwave_pb::hummock::Level;