Skip to content

Commit

Permalink
adjust base level size
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed May 29, 2024
1 parent fabbb96 commit 30f251f
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 50 deletions.
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1790,7 +1790,7 @@ pub mod default {
const DEFAULT_MAX_SPACE_RECLAIM_BYTES: u64 = 512 * 1024 * 1024; // 512MB;
const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_SUB_LEVEL_NUMBER: u64 = 300;
const DEFAULT_MAX_COMPACTION_FILE_COUNT: u64 = 100;
const DEFAULT_MIN_SUB_LEVEL_COMPACT_LEVEL_COUNT: u32 = 3;
const DEFAULT_MIN_SUB_LEVEL_COMPACT_LEVEL_COUNT: u32 = 4;
const DEFAULT_MIN_OVERLAPPING_SUB_LEVEL_COMPACT_LEVEL_COUNT: u32 = 12;
const DEFAULT_TOMBSTONE_RATIO_PERCENT: u32 = 40;
const DEFAULT_EMERGENCY_PICKER: bool = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,7 @@ struct IntraCompactionTaskValidationRule {

impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule {
fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
if (input.total_file_count >= self.config.level0_max_compact_file_number
&& input.input_levels.len() > 1)
|| input.input_levels.len() >= MAX_COMPACT_LEVEL_COUNT
{
if input.input_levels.len() >= MAX_COMPACT_LEVEL_COUNT {
return true;
}

Expand Down Expand Up @@ -179,7 +176,7 @@ impl CompactionTaskValidationRule for BaseCompactionTaskValidationRule {
fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
if input.total_file_count >= self.config.level0_max_compact_file_number
|| input.input_levels.len() >= MAX_COMPACT_LEVEL_COUNT
|| input.select_input_size + input.target_input_size >= self.config.max_compaction_bytes
|| input.select_input_size >= self.config.max_compaction_bytes
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,6 @@ impl IntraCompactionPicker {
let mut select_input_size = 0;
let mut total_file_count = 0;
for input in l0_select_tables_vec {
let mut max_level_size = 0;
for level_select_table in &input.sstable_infos {
let level_select_size = level_select_table
.iter()
.map(|sst| sst.file_size)
.sum::<u64>();

max_level_size = std::cmp::max(max_level_size, level_select_size);
}

let mut select_level_inputs = Vec::with_capacity(input.sstable_infos.len());
for level_select_sst in input.sstable_infos {
if level_select_sst.is_empty() {
Expand Down Expand Up @@ -376,6 +366,7 @@ impl WholeLevelCompactionPicker {
stats,
)
{
tracing::warn!("==========pick_whole_level============");
return Some(result);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,33 +362,33 @@ impl NonOverlapSubLevelPicker {

ret.sstable_infos.retain(|ssts| !ssts.is_empty());

// To check whether the task is expected
if ret.total_file_size > self.max_compaction_bytes
|| ret.total_file_count as u64 > self.max_file_count
|| ret.sstable_infos.len() > self.max_expected_level_count
{
// rotate the sstables to meet the `max_file_count` and `max_compaction_bytes` and `max_expected_level_count`
let mut total_file_count = 0;
let mut total_file_size = 0;
let mut total_level_count = 0;
for (index, sstables) in ret.sstable_infos.iter().enumerate() {
total_file_count += sstables.len();
total_file_size += sstables.iter().map(|sst| sst.file_size).sum::<u64>();
total_level_count += 1;

// Atleast `min_expected_level_count`` level should be selected
if total_level_count >= self.min_expected_level_count
&& (total_file_count as u64 >= self.max_file_count
|| total_file_size >= self.max_compaction_bytes
|| total_level_count >= self.max_expected_level_count)
{
ret.total_file_count = total_file_count;
ret.total_file_size = total_file_size;
ret.sstable_infos.truncate(index + 1);
break;
}
}
}
// // To check whether the task is expected
// if ret.total_file_size > self.max_compaction_bytes
// || ret.total_file_count as u64 > self.max_file_count
// || ret.sstable_infos.len() > self.max_expected_level_count
// {
// // rotate the sstables to meet the `max_file_count` and `max_compaction_bytes` and `max_expected_level_count`
// let mut total_file_count = 0;
// let mut total_file_size = 0;
// let mut total_level_count = 0;
// for (index, sstables) in ret.sstable_infos.iter().enumerate() {
// total_file_count += sstables.len();
// total_file_size += sstables.iter().map(|sst| sst.file_size).sum::<u64>();
// total_level_count += 1;
//
// // Atleast `min_expected_level_count`` level should be selected
// if total_level_count >= self.min_expected_level_count
// && (total_file_count as u64 >= self.max_file_count
// || total_file_size >= self.max_compaction_bytes
// || total_level_count >= self.max_expected_level_count)
// {
// ret.total_file_count = total_file_count;
// ret.total_file_size = total_file_size;
// ret.sstable_infos.truncate(index + 1);
// break;
// }
// }
// }

ret
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/compaction/selector/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ impl DynamicLevelSelectorCore {
return ctx;
}

let base_bytes_min = self.config.max_bytes_for_level_base;
let base_bytes_max = base_bytes_min * self.config.max_bytes_for_level_multiplier;
let base_bytes_max = self.config.max_bytes_for_level_base;
let base_bytes_min = base_bytes_max / self.config.max_bytes_for_level_multiplier;

let mut cur_level_size = max_level_size;
for _ in first_non_empty_level..self.config.max_level as usize {
Expand Down
9 changes: 4 additions & 5 deletions src/storage/hummock_sdk/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashSet;

use itertools::Itertools;
use risingwave_pb::hummock::{CompactTask, LevelType, SstableInfo};

pub fn compact_task_output_to_string(compact_task: &CompactTask) -> String {
Expand Down Expand Up @@ -99,14 +100,12 @@ pub fn compact_task_to_string(compact_task: &CompactTask) -> String {
writeln!(s, "Level {:?} {:?} ", level_entry.level_idx, tables).unwrap();
}
if !compact_task.table_vnode_partition.is_empty() {
writeln!(s, "Table vnode partition info:").unwrap();
compact_task
let partitions = compact_task
.table_vnode_partition
.iter()
.filter(|t| input_sst_table_ids.contains(t.0))
.for_each(|(tid, partition)| {
writeln!(s, " [{:?}, {:?}]", tid, partition).unwrap();
});
.collect_vec();
writeln!(s, "Table vnode partition info: {:?}", partitions).unwrap();
}

if !dropped_table_ids.is_empty() {
Expand Down

0 comments on commit 30f251f

Please sign in to comment.