Skip to content

Commit

Permalink
do not split too many files
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Sep 26, 2023
1 parent 42290e4 commit 3dcff6b
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 4 deletions.
3 changes: 3 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ message Level {
uint64 total_file_size = 4;
uint64 sub_level_id = 5;
uint64 uncompressed_file_size = 6;
uint32 vnode_partition_count = 7;
}

message InputLevel {
Expand All @@ -62,6 +63,7 @@ message IntraLevelDelta {
uint64 l0_sub_level_id = 2;
repeated uint64 removed_table_ids = 3;
repeated SstableInfo inserted_table_infos = 4;
uint32 vnode_partition_count = 5;
}

enum CompatibilityVersion {
Expand Down Expand Up @@ -116,6 +118,7 @@ message HummockVersion {
uint64 group_id = 3;
uint64 parent_group_id = 4;
repeated uint32 member_table_ids = 5;
uint32 vnode_partition_count = 6;
}
uint64 id = 1;
// Levels of each compaction group
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl CompactStatus {
target_sub_level_id: ret.input.target_sub_level_id,
task_type: ret.compaction_task_type as i32,
split_by_state_table: group.compaction_config.split_by_state_table,
split_weight_by_vnode: group.compaction_config.split_weight_by_vnode,
split_weight_by_vnode: ret.input.vnode_partition_count,
};
Some(compact_task)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule {
return true;
}

if input.vnode_partition_count > 0
&& input.select_input_size > self.config.sub_level_max_compaction_bytes
&& input.input_levels.len() > 1
{
return true;
}

let intra_sub_level_compact_level_count =
self.config.level0_sub_level_compact_level_count as usize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,16 @@ impl CompactionPicker for IntraCompactionPicker {
return None;
}

if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], stats) {
let vnode_partition_count = levels.vnode_partition_count;

if let Some(ret) =
self.pick_whole_level(l0, &level_handlers[0], vnode_partition_count, stats)
{
return Some(ret);
}

if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], vnode_partition_count, stats)
{
return Some(ret);
}

Expand All @@ -84,17 +93,88 @@ impl IntraCompactionPicker {
}
}

fn pick_whole_level(
&self,
l0: &OverlappingLevel,
level_handler: &LevelHandler,
partition_count: u32,
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
for (idx, level) in l0.sub_levels.iter().enumerate() {
if level.level_type() != LevelType::Nonoverlapping
|| level.total_file_size > self.config.sub_level_max_compaction_bytes
|| level.vnode_partition_count == partition_count
{
continue;
}

let max_compaction_bytes = std::cmp::min(
self.config.max_compaction_bytes,
self.config.sub_level_max_compaction_bytes
* (self.config.level0_sub_level_compact_level_count as u64),
);

let mut select_input_size = 0;

let mut select_level_inputs = vec![];
let mut total_file_count = 0;
for next_level in l0.sub_levels.iter().skip(idx) {
if next_level.level_type() != LevelType::Nonoverlapping
|| select_input_size > max_compaction_bytes
|| level_handler.is_level_pending_compact(next_level)
{
break;
}
select_input_size += next_level.total_file_size;
total_file_count += next_level.table_infos.len();

select_level_inputs.push(InputLevel {
level_idx: 0,
level_type: next_level.level_type,
table_infos: next_level.table_infos.clone(),
});
}
if !select_level_inputs.is_empty() {
let vnode_partition_count =
if select_input_size > self.config.sub_level_max_compaction_bytes {
partition_count
} else {
0
};
let result = CompactionInput {
input_levels: select_level_inputs,
target_sub_level_id: level.sub_level_id,
select_input_size,
total_file_count: total_file_count as u64,
vnode_partition_count,
..Default::default()
};
if self.compaction_task_validator.valid_compact_task(
&result,
ValidationRuleType::Intra,
stats,
) {
return Some(result);
}
}
}

None
}

fn pick_l0_intra(
&self,
l0: &OverlappingLevel,
level_handler: &LevelHandler,
vnode_partition_count: u32,
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());

for (idx, level) in l0.sub_levels.iter().enumerate() {
if level.level_type() != LevelType::Nonoverlapping
|| level.total_file_size > self.config.sub_level_max_compaction_bytes
|| level.vnode_partition_count < vnode_partition_count
{
continue;
}
Expand Down
16 changes: 16 additions & 0 deletions src/meta/src/hummock/compaction/picker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub struct CompactionInput {
pub select_input_size: u64,
pub target_input_size: u64,
pub total_file_count: u64,
pub vnode_partition_count: u32,
}

impl CompactionInput {
Expand Down Expand Up @@ -96,3 +97,18 @@ pub trait CompactionPicker {
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput>;
}

#[derive(Default, Clone, Debug)]
pub struct PartitionLevelInfo {
pub level_id: u32,
pub sub_level_id: u64,
pub left_idx: usize,
pub right_idx: usize,
pub total_file_size: u64,
}

#[derive(Default, Clone, Debug)]
pub struct LevelPartition {
pub sub_levels: Vec<PartitionLevelInfo>,
pub total_file_size: u64,
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl TierCompactionPicker {
&self,
l0: &OverlappingLevel,
level_handler: &LevelHandler,
mut vnode_partition_count: u32,
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
for (idx, level) in l0.sub_levels.iter().enumerate() {
Expand Down Expand Up @@ -114,6 +115,9 @@ impl TierCompactionPicker {
}

select_level_inputs.reverse();
if compaction_bytes < self.config.sub_level_max_compaction_bytes {
vnode_partition_count = 0;
}

let result = CompactionInput {
input_levels: select_level_inputs,
Expand All @@ -122,6 +126,7 @@ impl TierCompactionPicker {
select_input_size: compaction_bytes,
target_input_size: 0,
total_file_count: compact_file_count,
vnode_partition_count,
};

if !self.compaction_task_validator.valid_compact_task(
Expand Down Expand Up @@ -150,7 +155,7 @@ impl CompactionPicker for TierCompactionPicker {
return None;
}

self.pick_overlapping_level(l0, &level_handlers[0], stats)
self.pick_overlapping_level(l0, &level_handlers[0], levels.vnode_partition_count, stats)
}
}

Expand Down Expand Up @@ -257,7 +262,7 @@ pub mod tests {
// sub_level_max_compaction_bytes.
let mut picker = TierCompactionPicker::new(config);
let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
assert!(ret.is_none())
assert!(ret.is_none());
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2900,6 +2900,7 @@ fn gen_version_delta<'a>(
level_idx: compact_task.target_level,
inserted_table_infos: compact_task.sorted_output_ssts.clone(),
l0_sub_level_id: compact_task.target_sub_level_id,
vnode_partition_count: compact_task.split_weight_by_vnode,
..Default::default()
})),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::ops::Bound;

use itertools::Itertools;
use risingwave_common::catalog::TableId;
Expand Down Expand Up @@ -44,6 +45,7 @@ pub struct GroupDeltasSummary {
pub group_destroy: Option<GroupDestroy>,
pub group_meta_changes: Vec<GroupMetaChange>,
pub group_table_change: Option<GroupTableChange>,
pub new_vnode_partition_count: u32,
}

pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary {
Expand All @@ -56,6 +58,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary
let mut group_destroy = None;
let mut group_meta_changes = vec![];
let mut group_table_change = None;
let mut new_vnode_partition_count = 0;

for group_delta in &group_deltas.group_deltas {
match group_delta.get_delta_type().unwrap() {
Expand All @@ -69,6 +72,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary
insert_sub_level_id = intra_level.l0_sub_level_id;
insert_table_infos.extend(intra_level.inserted_table_infos.iter().cloned());
}
new_vnode_partition_count = intra_level.vnode_partition_count;
}
DeltaType::GroupConstruct(construct_delta) => {
assert!(group_construct.is_none());
Expand Down Expand Up @@ -100,6 +104,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary
group_destroy,
group_meta_changes,
group_table_change,
new_vnode_partition_count,
}
}

Expand Down Expand Up @@ -620,6 +625,7 @@ pub trait HummockLevelsExt {
delete_sst_levels: &[u32],
delete_sst_ids_set: HashSet<u64>,
) -> bool;
fn can_partition_by_vnode(&self) -> bool;
}

impl HummockLevelsExt for Levels {
Expand Down Expand Up @@ -651,6 +657,7 @@ impl HummockLevelsExt for Levels {
insert_sst_level_id,
insert_sub_level_id,
insert_table_infos,
new_vnode_partition_count,
..
} = summary;

Expand Down Expand Up @@ -695,9 +702,29 @@ impl HummockLevelsExt for Levels {
"should find the level to insert into when applying compaction generated delta. sub level idx: {}, removed sst ids: {:?}, sub levels: {:?},",
insert_sub_level_id, delete_sst_ids_set, l0.sub_levels.iter().map(|level| level.sub_level_id).collect_vec()
);
if l0.sub_levels[index].table_infos.is_empty()
&& self.member_table_ids.len() == 1
&& insert_table_infos.iter().all(|sst| {
sst.table_ids.len() == 1 && sst.table_ids[0] == self.member_table_ids[0]
})
{
l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
}
level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
} else {
let idx = insert_sst_level_id as usize - 1;
if self.levels[idx].table_infos.is_empty()
&& insert_table_infos
.iter()
.all(|sst| sst.table_ids.len() == 1)
{
self.levels[idx].vnode_partition_count = new_vnode_partition_count;
} else if self.levels[idx].vnode_partition_count != 0
&& new_vnode_partition_count == 0
&& self.member_table_ids.len() > 1
{
self.levels[idx].vnode_partition_count = 0;
}
level_insert_ssts(&mut self.levels[idx], insert_table_infos);
}
}
Expand Down Expand Up @@ -747,6 +774,10 @@ impl HummockLevelsExt for Levels {
}
delete_sst_ids_set.is_empty()
}

fn can_partition_by_vnode(&self) -> bool {
self.vnode_partition_count > 0 && self.member_table_ids.len() == 1
}
}

pub fn build_initial_compaction_group_levels(
Expand All @@ -762,6 +793,7 @@ pub fn build_initial_compaction_group_levels(
total_file_size: 0,
sub_level_id: 0,
uncompressed_file_size: 0,
vnode_partition_count: 0,
});
}
Levels {
Expand All @@ -774,6 +806,7 @@ pub fn build_initial_compaction_group_levels(
group_id,
parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
member_table_ids: vec![],
vnode_partition_count: compaction_config.split_weight_by_vnode,
}
}

Expand Down Expand Up @@ -912,6 +945,7 @@ pub fn new_sub_level(
total_file_size,
sub_level_id,
uncompressed_file_size,
vnode_partition_count: 0,
}
}

Expand Down

0 comments on commit 3dcff6b

Please sign in to comment.