Skip to content

Commit

Permalink
refactor(storage): seperate validator logic from picker (#11984)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Sep 7, 2023
1 parent b578db6 commit 5a8866d
Show file tree
Hide file tree
Showing 12 changed files with 376 additions and 116 deletions.
17 changes: 14 additions & 3 deletions src/meta/src/hummock/compaction/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{compact_task, CompactionConfig, LevelType};

use super::picker::{
SpaceReclaimCompactionPicker, SpaceReclaimPickerState, TtlPickerState,
CompactionTaskValidator, SpaceReclaimCompactionPicker, SpaceReclaimPickerState, TtlPickerState,
TtlReclaimCompactionPicker,
};
use super::{
Expand Down Expand Up @@ -95,14 +95,19 @@ impl DynamicLevelSelectorCore {
select_level: usize,
target_level: usize,
overlap_strategy: Arc<dyn OverlapStrategy>,
compaction_task_validator: Arc<CompactionTaskValidator>,
) -> Box<dyn CompactionPicker> {
if select_level == 0 {
if target_level == 0 {
Box::new(TierCompactionPicker::new(self.config.clone()))
Box::new(TierCompactionPicker::new_with_validator(
self.config.clone(),
compaction_task_validator,
))
} else {
Box::new(LevelCompactionPicker::new(
Box::new(LevelCompactionPicker::new_with_validator(
target_level,
self.config.clone(),
compaction_task_validator,
))
}
} else {
Expand Down Expand Up @@ -374,6 +379,11 @@ impl LevelSelector for DynamicLevelSelector {
let overlap_strategy =
create_overlap_strategy(compaction_group.compaction_config.compaction_mode());
let ctx = dynamic_level_core.get_priority_levels(levels, level_handlers);

// TODO: Determine which rule to enable by write limit
let compaction_task_validator = Arc::new(CompactionTaskValidator::new(
compaction_group.compaction_config.clone(),
));
for (score, select_level, target_level) in ctx.score_levels {
if score <= SCORE_BASE {
return None;
Expand All @@ -382,6 +392,7 @@ impl LevelSelector for DynamicLevelSelector {
select_level,
target_level,
overlap_strategy.clone(),
compaction_task_validator.clone(),
);
let mut stats = LocalPickerStatistic::default();
if let Some(ret) = picker.pick_compaction(levels, level_handlers, &mut stats) {
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,25 +247,25 @@ impl LocalSelectorStatistic {
metrics
.compact_skip_frequency
.with_label_values(&[level_label.as_str(), "write-amp"])
.inc_by(stats.skip_by_write_amp_limit);
.inc();
}
if stats.skip_by_count_limit > 0 {
metrics
.compact_skip_frequency
.with_label_values(&[level_label.as_str(), "count"])
.inc_by(stats.skip_by_count_limit);
.inc();
}
if stats.skip_by_pending_files > 0 {
metrics
.compact_skip_frequency
.with_label_values(&[level_label.as_str(), "pending-files"])
.inc_by(stats.skip_by_pending_files);
.inc();
}
if stats.skip_by_overlapping > 0 {
metrics
.compact_skip_frequency
.with_label_values(&[level_label.as_str(), "overlapping"])
.inc_by(stats.skip_by_overlapping);
.inc();
}
metrics
.compact_skip_frequency
Expand Down
132 changes: 63 additions & 69 deletions src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactionConfig, InputLevel, Level, LevelType, OverlappingLevel};

use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker;
use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
use super::{
CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
ValidationRuleType,
};
use crate::hummock::compaction::create_overlap_strategy;
use crate::hummock::compaction::picker::TrivialMovePicker;
use crate::hummock::level_handler::LevelHandler;

pub struct LevelCompactionPicker {
target_level: usize,
config: Arc<CompactionConfig>,
compaction_task_validator: Arc<CompactionTaskValidator>,
}

impl CompactionPicker for LevelCompactionPicker {
Expand Down Expand Up @@ -84,13 +88,27 @@ impl CompactionPicker for LevelCompactionPicker {
}

impl LevelCompactionPicker {
#[cfg(test)]
pub fn new(target_level: usize, config: Arc<CompactionConfig>) -> LevelCompactionPicker {
LevelCompactionPicker {
target_level,
compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
config,
}
}

pub fn new_with_validator(
target_level: usize,
config: Arc<CompactionConfig>,
compaction_task_validator: Arc<CompactionTaskValidator>,
) -> LevelCompactionPicker {
LevelCompactionPicker {
target_level,
config,
compaction_task_validator,
}
}

fn pick_base_trivial_move(
&self,
l0: &OverlappingLevel,
Expand All @@ -117,10 +135,10 @@ impl LevelCompactionPicker {
level_handlers: &[LevelHandler],
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
// TODO: remove this
let l0_size = l0.total_file_size - level_handlers[0].get_pending_file_size();
let base_level_size = target_level.total_file_size
- level_handlers[target_level.level_idx as usize].get_pending_file_size();

if l0_size < base_level_size {
stats.skip_by_write_amp_limit += 1;
return None;
Expand Down Expand Up @@ -157,7 +175,7 @@ impl LevelCompactionPicker {

let mut skip_by_pending = false;
let mut input_levels = vec![];
let mut min_write_amp_meet = false;

for input in l0_select_tables_vec {
let l0_select_tables = input
.sstable_infos
Expand All @@ -184,16 +202,6 @@ impl LevelCompactionPicker {
continue;
}

// The size of target level may be too large, we shall skip this compact task and wait
// the data in base level compact to lower level.
if target_level_size > self.config.max_compaction_bytes && strict_check {
continue;
}

if input.total_file_size >= target_level_size {
min_write_amp_meet = true;
}

input_levels.push((input, target_level_size, target_level_ssts));
}

Expand All @@ -204,20 +212,7 @@ impl LevelCompactionPicker {
return None;
}

if !min_write_amp_meet && strict_check {
// If the write-amplification of all candidate task are large, we may hope to wait base
// level compact more data to lower level. But if we skip all task, I'm
// afraid the data will be blocked in level0 and will be never compacted to base level.
// So we only allow one task exceed write-amplification-limit running in
// level0 to base-level.
return None;
}

for (input, target_file_size, target_level_files) in input_levels {
if min_write_amp_meet && input.total_file_size < target_file_size {
continue;
}

let mut select_level_inputs = input
.sstable_infos
.into_iter()
Expand All @@ -228,18 +223,33 @@ impl LevelCompactionPicker {
})
.collect_vec();
select_level_inputs.reverse();
let target_file_count = target_level_files.len();
select_level_inputs.push(InputLevel {
level_idx: target_level.level_idx,
level_type: target_level.level_type,
table_infos: target_level_files,
});
return Some(CompactionInput {

let result = CompactionInput {
input_levels: select_level_inputs,
target_level: self.target_level,
target_sub_level_id: 0,
});
select_input_size: input.total_file_size,
target_input_size: target_file_size,
total_file_count: (input.total_file_count + target_file_count) as u64,
..Default::default()
};

if !self.compaction_task_validator.valid_compact_task(
&result,
ValidationRuleType::ToBase,
stats,
) && strict_check
{
continue;
}

return Some(result);
}
stats.skip_by_write_amp_limit += 1;
None
}

Expand Down Expand Up @@ -267,8 +277,6 @@ impl LevelCompactionPicker {
self.config.sub_level_max_compaction_bytes,
);

let tier_sub_level_compact_level_count =
self.config.level0_sub_level_compact_level_count as usize;
let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
self.config.sub_level_max_compaction_bytes / 2,
max_compaction_bytes,
Expand All @@ -284,18 +292,10 @@ impl LevelCompactionPicker {
continue;
}

let mut skip_by_write_amp = false;
// Limit the number of selection levels for the non-overlapping
// sub_level at least level0_sub_level_compact_level_count
for (plan_index, input) in l0_select_tables_vec.into_iter().enumerate() {
if plan_index == 0
&& input.sstable_infos.len()
< self.config.level0_sub_level_compact_level_count as usize
{
// first plan level count smaller than limit
break;
}

let validator = CompactionTaskValidator::new(self.config.clone());
let mut select_input_size = 0;
let mut total_file_count = 0;
for input in l0_select_tables_vec {
let mut max_level_size = 0;
for level_select_table in &input.sstable_infos {
let level_select_size = level_select_table
Expand All @@ -306,22 +306,6 @@ impl LevelCompactionPicker {
max_level_size = std::cmp::max(max_level_size, level_select_size);
}

// This limitation would keep our write-amplification no more than
// ln(max_compaction_bytes/flush_level_bytes) /
// ln(self.config.level0_sub_level_compact_level_count/2) Here we only use half
// of level0_sub_level_compact_level_count just for convenient.
let is_write_amp_large =
max_level_size * self.config.level0_sub_level_compact_level_count as u64 / 2
>= input.total_file_size;

if (is_write_amp_large
|| input.sstable_infos.len() < tier_sub_level_compact_level_count)
&& input.total_file_count < self.config.level0_max_compact_file_number as usize
{
skip_by_write_amp = true;
continue;
}

let mut select_level_inputs = Vec::with_capacity(input.sstable_infos.len());
for level_select_sst in input.sstable_infos {
if level_select_sst.is_empty() {
Expand All @@ -332,17 +316,25 @@ impl LevelCompactionPicker {
level_type: LevelType::Nonoverlapping as i32,
table_infos: level_select_sst,
});

select_input_size += input.total_file_size;
total_file_count += input.total_file_count;
}
select_level_inputs.reverse();
return Some(CompactionInput {

let result = CompactionInput {
input_levels: select_level_inputs,
target_level: 0,
target_sub_level_id: level.sub_level_id,
});
}
select_input_size,
total_file_count: total_file_count as u64,
..Default::default()
};

if skip_by_write_amp {
stats.skip_by_write_amp_limit += 1;
if !validator.valid_compact_task(&result, ValidationRuleType::Intra, stats) {
continue;
}

return Some(result);
}
}

Expand Down Expand Up @@ -402,6 +394,7 @@ impl LevelCompactionPicker {
target_level_idx -= 1;
}

let select_input_size = select_sst.file_size;
let input_levels = vec![
InputLevel {
level_idx: 0,
Expand All @@ -418,6 +411,9 @@ impl LevelCompactionPicker {
input_levels,
target_level: 0,
target_sub_level_id: l0.sub_levels[target_level_idx].sub_level_id,
select_input_size,
total_file_count: 1,
..Default::default()
});
}
None
Expand Down Expand Up @@ -901,6 +897,7 @@ pub mod tests {
}],
target_level: 1,
target_sub_level_id: pending_level.sub_level_id,
..Default::default()
};
assert!(!levels_handler[0].is_level_pending_compact(&pending_level));
tier_task_input.add_pending_task(1, &mut levels_handler);
Expand All @@ -918,7 +915,6 @@ pub mod tests {
// But stopped by pending sub-level when trying to include more sub-levels.
let mut picker = LevelCompactionPicker::new(1, config.clone());
let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);

assert!(ret.is_none());

// Free the pending sub-level.
Expand Down Expand Up @@ -964,7 +960,6 @@ pub mod tests {
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
// println!("ret.input_levels: {:?}", ret.input_levels);
// 1. trivial_move
assert_eq!(2, ret.input_levels.len());
assert!(ret.input_levels[1].table_infos.is_empty());
Expand All @@ -974,7 +969,6 @@ pub mod tests {
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
println!("ret.input_levels: {:?}", ret.input_levels);
assert_eq!(3, ret.input_levels.len());
assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id);
}
Expand Down
Loading

0 comments on commit 5a8866d

Please sign in to comment.