From 5a8866d8a9534ccc1142f4f3d51bd9cd31660384 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 7 Sep 2023 15:28:56 +0800 Subject: [PATCH] refactor(storage): seperate validator logic from picker (#11984) --- .../src/hummock/compaction/level_selector.rs | 17 +- src/meta/src/hummock/compaction/mod.rs | 8 +- .../picker/base_level_compaction_picker.rs | 132 ++++++----- .../picker/compaction_task_validator.rs | 208 ++++++++++++++++++ .../picker/manual_compaction_picker.rs | 7 +- .../picker/min_overlap_compaction_picker.rs | 10 +- src/meta/src/hummock/compaction/picker/mod.rs | 10 + .../picker/space_reclaim_compaction_picker.rs | 7 +- .../picker/tier_compaction_picker.rs | 75 ++++--- .../tombstone_reclaim_compaction_picker.rs | 10 +- .../picker/trivial_move_compaction_picker.rs | 4 +- .../picker/ttl_reclaim_compaction_picker.rs | 4 +- 12 files changed, 376 insertions(+), 116 deletions(-) create mode 100644 src/meta/src/hummock/compaction/picker/compaction_task_validator.rs diff --git a/src/meta/src/hummock/compaction/level_selector.rs b/src/meta/src/hummock/compaction/level_selector.rs index f6565b89dba3..b6a3fc1a57b5 100644 --- a/src/meta/src/hummock/compaction/level_selector.rs +++ b/src/meta/src/hummock/compaction/level_selector.rs @@ -26,7 +26,7 @@ use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{compact_task, CompactionConfig, LevelType}; use super::picker::{ - SpaceReclaimCompactionPicker, SpaceReclaimPickerState, TtlPickerState, + CompactionTaskValidator, SpaceReclaimCompactionPicker, SpaceReclaimPickerState, TtlPickerState, TtlReclaimCompactionPicker, }; use super::{ @@ -95,14 +95,19 @@ impl DynamicLevelSelectorCore { select_level: usize, target_level: usize, overlap_strategy: Arc, + compaction_task_validator: Arc, ) -> Box { if select_level == 0 { if target_level == 0 { - Box::new(TierCompactionPicker::new(self.config.clone())) + Box::new(TierCompactionPicker::new_with_validator( + self.config.clone(), + compaction_task_validator, + )) } else { - Box::new(LevelCompactionPicker::new( + Box::new(LevelCompactionPicker::new_with_validator( target_level, self.config.clone(), + compaction_task_validator, )) } } else { @@ -374,6 +379,11 @@ impl LevelSelector for DynamicLevelSelector { let overlap_strategy = create_overlap_strategy(compaction_group.compaction_config.compaction_mode()); let ctx = dynamic_level_core.get_priority_levels(levels, level_handlers); + + // TODO: Determine which rule to enable by write limit + let compaction_task_validator = Arc::new(CompactionTaskValidator::new( + compaction_group.compaction_config.clone(), + )); for (score, select_level, target_level) in ctx.score_levels { if score <= SCORE_BASE { return None; @@ -382,6 +392,7 @@ impl LevelSelector for DynamicLevelSelector { select_level, target_level, overlap_strategy.clone(), + compaction_task_validator.clone(), ); let mut stats = LocalPickerStatistic::default(); if let Some(ret) = picker.pick_compaction(levels, level_handlers, &mut stats) { diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index 97443f3bc7a9..a9f8d95457cc 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -247,25 +247,25 @@ impl LocalSelectorStatistic { metrics .compact_skip_frequency .with_label_values(&[level_label.as_str(), "write-amp"]) - .inc_by(stats.skip_by_write_amp_limit); + .inc(); } if stats.skip_by_count_limit > 0 { metrics .compact_skip_frequency .with_label_values(&[level_label.as_str(), "count"]) - .inc_by(stats.skip_by_count_limit); + .inc(); } if stats.skip_by_pending_files > 0 { metrics .compact_skip_frequency .with_label_values(&[level_label.as_str(), "pending-files"]) - .inc_by(stats.skip_by_pending_files); + .inc(); } if stats.skip_by_overlapping > 0 { metrics .compact_skip_frequency .with_label_values(&[level_label.as_str(), "overlapping"]) - .inc_by(stats.skip_by_overlapping); + .inc(); } metrics .compact_skip_frequency diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index a15fab694ee8..c8025a9e99ac 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -20,7 +20,10 @@ use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{CompactionConfig, InputLevel, Level, LevelType, OverlappingLevel}; use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker; -use super::{CompactionInput, CompactionPicker, LocalPickerStatistic}; +use super::{ + CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic, + ValidationRuleType, +}; use crate::hummock::compaction::create_overlap_strategy; use crate::hummock::compaction::picker::TrivialMovePicker; use crate::hummock::level_handler::LevelHandler; @@ -28,6 +31,7 @@ use crate::hummock::level_handler::LevelHandler; pub struct LevelCompactionPicker { target_level: usize, config: Arc, + compaction_task_validator: Arc, } impl CompactionPicker for LevelCompactionPicker { @@ -84,13 +88,27 @@ impl CompactionPicker for LevelCompactionPicker { } impl LevelCompactionPicker { + #[cfg(test)] pub fn new(target_level: usize, config: Arc) -> LevelCompactionPicker { LevelCompactionPicker { target_level, + compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())), config, } } + pub fn new_with_validator( + target_level: usize, + config: Arc, + compaction_task_validator: Arc, + ) -> LevelCompactionPicker { + LevelCompactionPicker { + target_level, + config, + compaction_task_validator, + } + } + fn pick_base_trivial_move( &self, l0: &OverlappingLevel, @@ -117,10 +135,10 @@ impl LevelCompactionPicker { level_handlers: &[LevelHandler], stats: &mut LocalPickerStatistic, ) -> Option { + // TODO: remove this let l0_size = l0.total_file_size - level_handlers[0].get_pending_file_size(); let base_level_size = target_level.total_file_size - level_handlers[target_level.level_idx as usize].get_pending_file_size(); - if l0_size < base_level_size { stats.skip_by_write_amp_limit += 1; return None; @@ -157,7 +175,7 @@ impl LevelCompactionPicker { let mut skip_by_pending = false; let mut input_levels = vec![]; - let mut min_write_amp_meet = false; + for input in l0_select_tables_vec { let l0_select_tables = input .sstable_infos @@ -184,16 +202,6 @@ impl LevelCompactionPicker { continue; } - // The size of target level may be too large, we shall skip this compact task and wait - // the data in base level compact to lower level. - if target_level_size > self.config.max_compaction_bytes && strict_check { - continue; - } - - if input.total_file_size >= target_level_size { - min_write_amp_meet = true; - } - input_levels.push((input, target_level_size, target_level_ssts)); } @@ -204,20 +212,7 @@ impl LevelCompactionPicker { return None; } - if !min_write_amp_meet && strict_check { - // If the write-amplification of all candidate task are large, we may hope to wait base - // level compact more data to lower level. But if we skip all task, I'm - // afraid the data will be blocked in level0 and will be never compacted to base level. - // So we only allow one task exceed write-amplification-limit running in - // level0 to base-level. - return None; - } - for (input, target_file_size, target_level_files) in input_levels { - if min_write_amp_meet && input.total_file_size < target_file_size { - continue; - } - let mut select_level_inputs = input .sstable_infos .into_iter() @@ -228,18 +223,33 @@ impl LevelCompactionPicker { }) .collect_vec(); select_level_inputs.reverse(); + let target_file_count = target_level_files.len(); select_level_inputs.push(InputLevel { level_idx: target_level.level_idx, level_type: target_level.level_type, table_infos: target_level_files, }); - return Some(CompactionInput { + + let result = CompactionInput { input_levels: select_level_inputs, target_level: self.target_level, - target_sub_level_id: 0, - }); + select_input_size: input.total_file_size, + target_input_size: target_file_size, + total_file_count: (input.total_file_count + target_file_count) as u64, + ..Default::default() + }; + + if !self.compaction_task_validator.valid_compact_task( + &result, + ValidationRuleType::ToBase, + stats, + ) && strict_check + { + continue; + } + + return Some(result); } - stats.skip_by_write_amp_limit += 1; None } @@ -267,8 +277,6 @@ impl LevelCompactionPicker { self.config.sub_level_max_compaction_bytes, ); - let tier_sub_level_compact_level_count = - self.config.level0_sub_level_compact_level_count as usize; let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new( self.config.sub_level_max_compaction_bytes / 2, max_compaction_bytes, @@ -284,18 +292,10 @@ impl LevelCompactionPicker { continue; } - let mut skip_by_write_amp = false; - // Limit the number of selection levels for the non-overlapping - // sub_level at least level0_sub_level_compact_level_count - for (plan_index, input) in l0_select_tables_vec.into_iter().enumerate() { - if plan_index == 0 - && input.sstable_infos.len() - < self.config.level0_sub_level_compact_level_count as usize - { - // first plan level count smaller than limit - break; - } - + let validator = CompactionTaskValidator::new(self.config.clone()); + let mut select_input_size = 0; + let mut total_file_count = 0; + for input in l0_select_tables_vec { let mut max_level_size = 0; for level_select_table in &input.sstable_infos { let level_select_size = level_select_table @@ -306,22 +306,6 @@ impl LevelCompactionPicker { max_level_size = std::cmp::max(max_level_size, level_select_size); } - // This limitation would keep our write-amplification no more than - // ln(max_compaction_bytes/flush_level_bytes) / - // ln(self.config.level0_sub_level_compact_level_count/2) Here we only use half - // of level0_sub_level_compact_level_count just for convenient. - let is_write_amp_large = - max_level_size * self.config.level0_sub_level_compact_level_count as u64 / 2 - >= input.total_file_size; - - if (is_write_amp_large - || input.sstable_infos.len() < tier_sub_level_compact_level_count) - && input.total_file_count < self.config.level0_max_compact_file_number as usize - { - skip_by_write_amp = true; - continue; - } - let mut select_level_inputs = Vec::with_capacity(input.sstable_infos.len()); for level_select_sst in input.sstable_infos { if level_select_sst.is_empty() { @@ -332,17 +316,25 @@ impl LevelCompactionPicker { level_type: LevelType::Nonoverlapping as i32, table_infos: level_select_sst, }); + + select_input_size += input.total_file_size; + total_file_count += input.total_file_count; } select_level_inputs.reverse(); - return Some(CompactionInput { + + let result = CompactionInput { input_levels: select_level_inputs, - target_level: 0, target_sub_level_id: level.sub_level_id, - }); - } + select_input_size, + total_file_count: total_file_count as u64, + ..Default::default() + }; - if skip_by_write_amp { - stats.skip_by_write_amp_limit += 1; + if !validator.valid_compact_task(&result, ValidationRuleType::Intra, stats) { + continue; + } + + return Some(result); } } @@ -402,6 +394,7 @@ impl LevelCompactionPicker { target_level_idx -= 1; } + let select_input_size = select_sst.file_size; let input_levels = vec![ InputLevel { level_idx: 0, @@ -418,6 +411,9 @@ impl LevelCompactionPicker { input_levels, target_level: 0, target_sub_level_id: l0.sub_levels[target_level_idx].sub_level_id, + select_input_size, + total_file_count: 1, + ..Default::default() }); } None @@ -901,6 +897,7 @@ pub mod tests { }], target_level: 1, target_sub_level_id: pending_level.sub_level_id, + ..Default::default() }; assert!(!levels_handler[0].is_level_pending_compact(&pending_level)); tier_task_input.add_pending_task(1, &mut levels_handler); @@ -918,7 +915,6 @@ pub mod tests { // But stopped by pending sub-level when trying to include more sub-levels. let mut picker = LevelCompactionPicker::new(1, config.clone()); let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); - assert!(ret.is_none()); // Free the pending sub-level. @@ -964,7 +960,6 @@ pub mod tests { let ret = picker .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); - // println!("ret.input_levels: {:?}", ret.input_levels); // 1. trivial_move assert_eq!(2, ret.input_levels.len()); assert!(ret.input_levels[1].table_infos.is_empty()); @@ -974,7 +969,6 @@ pub mod tests { let ret = picker .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); - println!("ret.input_levels: {:?}", ret.input_levels); assert_eq!(3, ret.input_levels.len()); assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id); } diff --git a/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs b/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs new file mode 100644 index 000000000000..4de77467205f --- /dev/null +++ b/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs @@ -0,0 +1,208 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use risingwave_pb::hummock::CompactionConfig; + +use super::{CompactionInput, LocalPickerStatistic, MAX_COMPACT_LEVEL_COUNT}; + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ValidationRuleType { + Tier = 0, + Intra = 1, + ToBase = 2, +} + +pub struct CompactionTaskValidator { + validation_rules: HashMap>, +} + +impl CompactionTaskValidator { + pub fn new(config: Arc) -> Self { + let mut validation_rules: HashMap< + ValidationRuleType, + Box, + > = HashMap::default(); + + validation_rules.insert( + ValidationRuleType::Tier, + Box::new(TierCompactionTaskValidationRule { + config: config.clone(), + enable: true, + }), + ); + + validation_rules.insert( + ValidationRuleType::Intra, + Box::new(IntraCompactionTaskValidationRule { + config: config.clone(), + enable: true, + }), + ); + + validation_rules.insert( + ValidationRuleType::ToBase, + Box::new(BaseCompactionTaskValidationRule { + config, + enable: true, + }), + ); + + CompactionTaskValidator { validation_rules } + } + + pub fn valid_compact_task( + &self, + input: &CompactionInput, + picker_type: ValidationRuleType, + stats: &mut LocalPickerStatistic, + ) -> bool { + self.validation_rules + .get(&picker_type) + .unwrap() + .validate(input, stats) + } +} + +pub trait CompactionTaskValidationRule { + fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool; +} + +struct TierCompactionTaskValidationRule { + config: Arc, + enable: bool, +} + +impl CompactionTaskValidationRule for TierCompactionTaskValidationRule { + fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool { + if !self.enable { + return true; + } + + // so the design here wants to merge multiple overlapping-levels in one compaction + let max_compaction_bytes = std::cmp::min( + self.config.max_compaction_bytes, + self.config.sub_level_max_compaction_bytes + * self.config.level0_overlapping_sub_level_compact_level_count as u64, + ); + + // Limit sstable file count to avoid using too much memory. + let overlapping_max_compact_file_numer = std::cmp::min( + self.config.level0_max_compact_file_number, + MAX_COMPACT_LEVEL_COUNT as u64, + ); + + let waiting_enough_files = { + if input.select_input_size > max_compaction_bytes { + false + } else { + input.total_file_count <= overlapping_max_compact_file_numer + } + }; + + // If waiting_enough_files is not satisfied, we will raise the priority of the number of + // levels to ensure that we can merge as many sub_levels as possible + let tier_sub_level_compact_level_count = + self.config.level0_overlapping_sub_level_compact_level_count as usize; + if input.input_levels.len() < tier_sub_level_compact_level_count && waiting_enough_files { + stats.skip_by_count_limit += 1; + return false; + } + + true + } +} + +struct IntraCompactionTaskValidationRule { + config: Arc, + enable: bool, +} + +impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule { + fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool { + if !self.enable { + return true; + } + + let intra_sub_level_compact_level_count = + self.config.level0_sub_level_compact_level_count as usize; + + if input.input_levels.len() < intra_sub_level_compact_level_count { + return false; + } + + let mut max_level_size = 0; + for select_level in &input.input_levels { + let level_select_size = select_level + .table_infos + .iter() + .map(|sst| sst.file_size) + .sum::(); + + max_level_size = std::cmp::max(max_level_size, level_select_size); + } + + // This limitation would keep our write-amplification no more than + // ln(max_compaction_bytes/flush_level_bytes) / + // ln(self.config.level0_sub_level_compact_level_count/2) Here we only use half + // of level0_sub_level_compact_level_count just for convenient. + let is_write_amp_large = + max_level_size * self.config.level0_sub_level_compact_level_count as u64 / 2 + >= input.select_input_size; + + if is_write_amp_large && input.total_file_count < self.config.level0_max_compact_file_number + { + stats.skip_by_write_amp_limit += 1; + return false; + } + + if input.input_levels.len() < intra_sub_level_compact_level_count + && input.total_file_count < self.config.level0_max_compact_file_number + { + stats.skip_by_count_limit += 1; + return false; + } + + true + } +} + +struct BaseCompactionTaskValidationRule { + config: Arc, + enable: bool, +} + +impl CompactionTaskValidationRule for BaseCompactionTaskValidationRule { + fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool { + if !self.enable { + return true; + } + + // The size of target level may be too large, we shall skip this compact task and wait + // the data in base level compact to lower level. + if input.target_input_size > self.config.max_compaction_bytes { + stats.skip_by_count_limit += 1; + return false; + } + + if input.select_input_size < input.target_input_size { + stats.skip_by_write_amp_limit += 1; + return false; + } + + true + } +} diff --git a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs index a6942b2e4d68..e8f8c908d0fd 100644 --- a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs @@ -101,6 +101,7 @@ impl ManualCompactionPicker { input_levels, target_level: 0, target_sub_level_id: sub_level_id, + ..Default::default() }) } @@ -170,6 +171,7 @@ impl ManualCompactionPicker { input_levels, target_level: self.target_level, target_sub_level_id: 0, + ..Default::default() }) } @@ -301,6 +303,9 @@ impl CompactionPicker for ManualCompactionPicker { } Some(CompactionInput { + select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + target_input_size: target_input_ssts.iter().map(|sst| sst.file_size).sum(), + total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64, input_levels: vec![ InputLevel { level_idx: level as u32, @@ -314,7 +319,7 @@ impl CompactionPicker for ManualCompactionPicker { }, ], target_level, - target_sub_level_id: 0, + ..Default::default() }) } } diff --git a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs index b489ec37987b..efacf94d7dd9 100644 --- a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs @@ -20,10 +20,9 @@ use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{InputLevel, Level, LevelType, SstableInfo}; -use super::{CompactionInput, CompactionPicker, LocalPickerStatistic}; +use super::{CompactionInput, CompactionPicker, LocalPickerStatistic, MAX_COMPACT_LEVEL_COUNT}; use crate::hummock::compaction::overlap_strategy::OverlapStrategy; use crate::hummock::level_handler::LevelHandler; -pub const MAX_LEVEL_COUNT: usize = 42; pub struct MinOverlappingPicker { level: usize, @@ -130,6 +129,9 @@ impl CompactionPicker for MinOverlappingPicker { return None; } Some(CompactionInput { + select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + target_input_size: target_input_ssts.iter().map(|sst| sst.file_size).sum(), + total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64, input_levels: vec![ InputLevel { level_idx: self.level as u32, @@ -143,7 +145,7 @@ impl CompactionPicker for MinOverlappingPicker { }, ], target_level: self.target_level, - target_sub_level_id: 0, + ..Default::default() }) } } @@ -299,7 +301,7 @@ impl NonOverlapSubLevelPicker { .iter() .filter(|ssts| !ssts.is_empty()) .count() - > MAX_LEVEL_COUNT + > MAX_COMPACT_LEVEL_COUNT { break; } diff --git a/src/meta/src/hummock/compaction/picker/mod.rs b/src/meta/src/hummock/compaction/picker/mod.rs index 86f3736288be..04e0550b8413 100644 --- a/src/meta/src/hummock/compaction/picker/mod.rs +++ b/src/meta/src/hummock/compaction/picker/mod.rs @@ -21,7 +21,10 @@ mod tombstone_reclaim_compaction_picker; mod trivial_move_compaction_picker; mod ttl_reclaim_compaction_picker; +mod compaction_task_validator; + pub use base_level_compaction_picker::LevelCompactionPicker; +pub use compaction_task_validator::{CompactionTaskValidator, ValidationRuleType}; pub use manual_compaction_picker::ManualCompactionPicker; pub use min_overlap_compaction_picker::MinOverlappingPicker; use risingwave_pb::hummock::hummock_version::Levels; @@ -36,6 +39,8 @@ pub use ttl_reclaim_compaction_picker::{TtlPickerState, TtlReclaimCompactionPick use crate::hummock::level_handler::LevelHandler; +pub const MAX_COMPACT_LEVEL_COUNT: usize = 42; + #[derive(Default)] pub struct LocalPickerStatistic { pub skip_by_write_amp_limit: u64, @@ -43,10 +48,15 @@ pub struct LocalPickerStatistic { pub skip_by_pending_files: u64, pub skip_by_overlapping: u64, } + +#[derive(Default)] pub struct CompactionInput { pub input_levels: Vec, pub target_level: usize, pub target_sub_level_id: u64, + pub select_input_size: u64, + pub target_input_size: u64, + pub total_file_count: u64, } impl CompactionInput { diff --git a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs index 4371729db8d9..95fd5d2d5235 100644 --- a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs @@ -80,6 +80,8 @@ impl SpaceReclaimCompactionPicker { } if !select_input_ssts.is_empty() { return Some(CompactionInput { + select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + total_file_count: select_input_ssts.len() as u64, input_levels: vec![ InputLevel { level_idx: level.level_idx, @@ -94,6 +96,7 @@ impl SpaceReclaimCompactionPicker { ], target_level: level.level_idx as usize, target_sub_level_id: level.sub_level_id, + ..Default::default() }); } } @@ -135,6 +138,8 @@ impl SpaceReclaimCompactionPicker { // turn to next_round if !select_input_ssts.is_empty() { return Some(CompactionInput { + select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + total_file_count: select_input_ssts.len() as u64, input_levels: vec![ InputLevel { level_idx: state.last_level as u32, @@ -148,7 +153,7 @@ impl SpaceReclaimCompactionPicker { }, ], target_level: state.last_level, - target_sub_level_id: 0, + ..Default::default() }); } state.last_level += 1; diff --git a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs index 6ebb2fed5036..99b17694f528 100644 --- a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs @@ -19,17 +19,35 @@ use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{CompactionConfig, InputLevel, LevelType, OverlappingLevel}; -use super::{CompactionInput, CompactionPicker, LocalPickerStatistic}; -use crate::hummock::compaction::picker::min_overlap_compaction_picker::MAX_LEVEL_COUNT; +use super::{ + CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic, + ValidationRuleType, +}; +use crate::hummock::compaction::picker::MAX_COMPACT_LEVEL_COUNT; use crate::hummock::level_handler::LevelHandler; pub struct TierCompactionPicker { config: Arc, + compaction_task_validator: Arc, } impl TierCompactionPicker { + #[cfg(test)] pub fn new(config: Arc) -> TierCompactionPicker { - TierCompactionPicker { config } + TierCompactionPicker { + compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())), + config, + } + } + + pub fn new_with_validator( + config: Arc, + compaction_task_validator: Arc, + ) -> TierCompactionPicker { + TierCompactionPicker { + config, + compaction_task_validator, + } } fn pick_overlapping_level( @@ -66,9 +84,16 @@ impl TierCompactionPicker { if can_concat(&input_level.table_infos) { return Some(CompactionInput { + select_input_size: input_level + .table_infos + .iter() + .map(|sst| sst.file_size) + .sum(), + total_file_count: input_level.table_infos.len() as u64, input_levels: vec![input_level], target_level: 0, target_sub_level_id: level.sub_level_id, + ..Default::default() }); } @@ -87,29 +112,15 @@ impl TierCompactionPicker { // Limit sstable file count to avoid using too much memory. let overlapping_max_compact_file_numer = std::cmp::min( self.config.level0_max_compact_file_number, - MAX_LEVEL_COUNT as u64, + MAX_COMPACT_LEVEL_COUNT as u64, ); - let mut waiting_enough_files = { - if compaction_bytes > max_compaction_bytes { - false - } else { - compact_file_count <= overlapping_max_compact_file_numer - } - }; for other in &l0.sub_levels[idx + 1..] { if compaction_bytes > max_compaction_bytes { - waiting_enough_files = false; break; } if compact_file_count > overlapping_max_compact_file_numer { - waiting_enough_files = false; - break; - } - - if other.level_type() != LevelType::Overlapping { - waiting_enough_files = false; break; } @@ -126,24 +137,26 @@ impl TierCompactionPicker { }); } - // If waiting_enough_files is not satisfied, we will raise the priority of the number of - // levels to ensure that we can merge as many sub_levels as possible - let tier_sub_level_compact_level_count = - self.config.level0_overlapping_sub_level_compact_level_count as usize; - if select_level_inputs.len() < tier_sub_level_compact_level_count - && waiting_enough_files - { - stats.skip_by_count_limit += 1; - continue; - } - select_level_inputs.reverse(); - return Some(CompactionInput { + let result = CompactionInput { input_levels: select_level_inputs, target_level: 0, target_sub_level_id: level.sub_level_id, - }); + select_input_size: compaction_bytes, + target_input_size: 0, + total_file_count: compact_file_count, + }; + + if !self.compaction_task_validator.valid_compact_task( + &result, + ValidationRuleType::Tier, + stats, + ) { + continue; + } + + return Some(result); } None } diff --git a/src/meta/src/hummock/compaction/picker/tombstone_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/tombstone_reclaim_compaction_picker.rs index 97d8fa995d8b..994bfbc5ea55 100644 --- a/src/meta/src/hummock/compaction/picker/tombstone_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/tombstone_reclaim_compaction_picker.rs @@ -117,6 +117,14 @@ impl TombstoneReclaimCompactionPicker { } }; return Some(CompactionInput { + select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + target_input_size: target_level + .table_infos + .iter() + .map(|sst| sst.file_size) + .sum(), + total_file_count: (select_input_ssts.len() + target_level.table_infos.len()) + as u64, target_level: target_level.level_idx as usize, input_levels: vec![ InputLevel { @@ -126,7 +134,7 @@ impl TombstoneReclaimCompactionPicker { }, target_level, ], - target_sub_level_id: 0, + ..Default::default() }); } state.last_level += 1; diff --git a/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs index 89f794e04efb..4bfbca0c5fb5 100644 --- a/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/trivial_move_compaction_picker.rs @@ -79,6 +79,8 @@ impl TrivialMovePicker { self.pick_trivial_move_sst(select_tables, target_tables, level_handlers, stats) { return Some(CompactionInput { + select_input_size: trivial_move_sst.file_size, + total_file_count: 1, input_levels: vec![ InputLevel { level_idx: self.level as u32, @@ -92,7 +94,7 @@ impl TrivialMovePicker { }, ], target_level: self.target_level, - target_sub_level_id: 0, + ..Default::default() }); } diff --git a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs index df833f7a14c0..a822d33db3cf 100644 --- a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs @@ -199,6 +199,8 @@ impl TtlReclaimCompactionPicker { }); Some(CompactionInput { + select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + total_file_count: select_input_ssts.len() as _, input_levels: vec![ InputLevel { level_idx: reclaimed_level.level_idx, @@ -212,7 +214,7 @@ impl TtlReclaimCompactionPicker { }, ], target_level: reclaimed_level.level_idx as usize, - target_sub_level_id: 0, + ..Default::default() }) } }