diff --git a/proto/hummock.proto b/proto/hummock.proto index c73d79e07e2b7..dca9e13fd11ed 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -49,6 +49,7 @@ message Level { uint64 total_file_size = 4; uint64 sub_level_id = 5; uint64 uncompressed_file_size = 6; + uint32 vnode_partition_count = 7; } message InputLevel { @@ -62,6 +63,7 @@ message IntraLevelDelta { uint64 l0_sub_level_id = 2; repeated uint64 removed_table_ids = 3; repeated SstableInfo inserted_table_infos = 4; + uint32 vnode_partition_count = 5; } enum CompatibilityVersion { @@ -116,6 +118,7 @@ message HummockVersion { uint64 group_id = 3; uint64 parent_group_id = 4; repeated uint32 member_table_ids = 5; + uint32 vnode_partition_count = 6; } uint64 id = 1; // Levels of each compaction group diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index 23585da8999a9..4552298dccabb 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -155,7 +155,7 @@ impl CompactStatus { target_sub_level_id: ret.input.target_sub_level_id, task_type: ret.compaction_task_type as i32, split_by_state_table: group.compaction_config.split_by_state_table, - split_weight_by_vnode: group.compaction_config.split_weight_by_vnode, + split_weight_by_vnode: ret.input.vnode_partition_count, }; Some(compact_task) } diff --git a/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs b/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs index 7452f65d6503a..34c71c3cd4b75 100644 --- a/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs +++ b/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs @@ -135,6 +135,13 @@ impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule { return true; } + if input.vnode_partition_count > 0 + && input.select_input_size > self.config.sub_level_max_compaction_bytes + && input.input_levels.len() > 1 + { + return true; + } + let intra_sub_level_compact_level_count = self.config.level0_sub_level_compact_level_count as usize; diff --git a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs index 541b93254172b..d43be005a30ea 100644 --- a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs @@ -57,7 +57,16 @@ impl CompactionPicker for IntraCompactionPicker { return None; } - if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], stats) { + let vnode_partition_count = levels.vnode_partition_count; + + if let Some(ret) = + self.pick_whole_level(l0, &level_handlers[0], vnode_partition_count, stats) + { + return Some(ret); + } + + if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], vnode_partition_count, stats) + { return Some(ret); } @@ -84,10 +93,80 @@ impl IntraCompactionPicker { } } + fn pick_whole_level( + &self, + l0: &OverlappingLevel, + level_handler: &LevelHandler, + partition_count: u32, + stats: &mut LocalPickerStatistic, + ) -> Option { + for (idx, level) in l0.sub_levels.iter().enumerate() { + if level.level_type() != LevelType::Nonoverlapping + || level.total_file_size > self.config.sub_level_max_compaction_bytes + || level.vnode_partition_count == partition_count + { + continue; + } + + let max_compaction_bytes = std::cmp::min( + self.config.max_compaction_bytes, + self.config.sub_level_max_compaction_bytes + * (self.config.level0_sub_level_compact_level_count as u64), + ); + + let mut select_input_size = 0; + + let mut select_level_inputs = vec![]; + let mut total_file_count = 0; + for next_level in l0.sub_levels.iter().skip(idx) { + if next_level.level_type() != LevelType::Nonoverlapping + || select_input_size > max_compaction_bytes + || level_handler.is_level_pending_compact(next_level) + { + break; + } + select_input_size += next_level.total_file_size; + total_file_count += next_level.table_infos.len(); + + select_level_inputs.push(InputLevel { + level_idx: 0, + level_type: next_level.level_type, + table_infos: next_level.table_infos.clone(), + }); + } + if !select_level_inputs.is_empty() { + let vnode_partition_count = + if select_input_size > self.config.sub_level_max_compaction_bytes { + partition_count + } else { + 0 + }; + let result = CompactionInput { + input_levels: select_level_inputs, + target_sub_level_id: level.sub_level_id, + select_input_size, + total_file_count: total_file_count as u64, + vnode_partition_count, + ..Default::default() + }; + if self.compaction_task_validator.valid_compact_task( + &result, + ValidationRuleType::Intra, + stats, + ) { + return Some(result); + } + } + } + + None + } + fn pick_l0_intra( &self, l0: &OverlappingLevel, level_handler: &LevelHandler, + vnode_partition_count: u32, stats: &mut LocalPickerStatistic, ) -> Option { let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); @@ -95,6 +174,7 @@ impl IntraCompactionPicker { for (idx, level) in l0.sub_levels.iter().enumerate() { if level.level_type() != LevelType::Nonoverlapping || level.total_file_size > self.config.sub_level_max_compaction_bytes + || level.vnode_partition_count < vnode_partition_count { continue; } diff --git a/src/meta/src/hummock/compaction/picker/mod.rs b/src/meta/src/hummock/compaction/picker/mod.rs index ac1a8f825aa33..ae588704e436e 100644 --- a/src/meta/src/hummock/compaction/picker/mod.rs +++ b/src/meta/src/hummock/compaction/picker/mod.rs @@ -61,6 +61,7 @@ pub struct CompactionInput { pub select_input_size: u64, pub target_input_size: u64, pub total_file_count: u64, + pub vnode_partition_count: u32, } impl CompactionInput { @@ -96,3 +97,18 @@ pub trait CompactionPicker { stats: &mut LocalPickerStatistic, ) -> Option; } + +#[derive(Default, Clone, Debug)] +pub struct PartitionLevelInfo { + pub level_id: u32, + pub sub_level_id: u64, + pub left_idx: usize, + pub right_idx: usize, + pub total_file_size: u64, +} + +#[derive(Default, Clone, Debug)] +pub struct LevelPartition { + pub sub_levels: Vec, + pub total_file_size: u64, +} diff --git a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs index 5b3058317a4b0..3f68c4459bbd4 100644 --- a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs @@ -52,6 +52,7 @@ impl TierCompactionPicker { &self, l0: &OverlappingLevel, level_handler: &LevelHandler, + mut vnode_partition_count: u32, stats: &mut LocalPickerStatistic, ) -> Option { for (idx, level) in l0.sub_levels.iter().enumerate() { @@ -114,6 +115,9 @@ impl TierCompactionPicker { } select_level_inputs.reverse(); + if compaction_bytes < self.config.sub_level_max_compaction_bytes { + vnode_partition_count = 0; + } let result = CompactionInput { input_levels: select_level_inputs, @@ -122,6 +126,7 @@ impl TierCompactionPicker { select_input_size: compaction_bytes, target_input_size: 0, total_file_count: compact_file_count, + vnode_partition_count, }; if !self.compaction_task_validator.valid_compact_task( @@ -150,7 +155,7 @@ impl CompactionPicker for TierCompactionPicker { return None; } - self.pick_overlapping_level(l0, &level_handlers[0], stats) + self.pick_overlapping_level(l0, &level_handlers[0], levels.vnode_partition_count, stats) } } @@ -257,7 +262,7 @@ pub mod tests { // sub_level_max_compaction_bytes. let mut picker = TierCompactionPicker::new(config); let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); - assert!(ret.is_none()) + assert!(ret.is_none()); } #[test] diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 5effaee29077b..a6ef5034f5646 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -2900,6 +2900,7 @@ fn gen_version_delta<'a>( level_idx: compact_task.target_level, inserted_table_infos: compact_task.sorted_output_ssts.clone(), l0_sub_level_id: compact_task.target_sub_level_id, + vnode_partition_count: compact_task.split_weight_by_vnode, ..Default::default() })), }; diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 1193877a14c9b..b5c9889d31cb4 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -15,6 +15,7 @@ use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::ops::Bound; use itertools::Itertools; use risingwave_common::catalog::TableId; @@ -44,6 +45,7 @@ pub struct GroupDeltasSummary { pub group_destroy: Option, pub group_meta_changes: Vec, pub group_table_change: Option, + pub new_vnode_partition_count: u32, } pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary { @@ -56,6 +58,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary let mut group_destroy = None; let mut group_meta_changes = vec![]; let mut group_table_change = None; + let mut new_vnode_partition_count = 0; for group_delta in &group_deltas.group_deltas { match group_delta.get_delta_type().unwrap() { @@ -69,6 +72,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary insert_sub_level_id = intra_level.l0_sub_level_id; insert_table_infos.extend(intra_level.inserted_table_infos.iter().cloned()); } + new_vnode_partition_count = intra_level.vnode_partition_count; } DeltaType::GroupConstruct(construct_delta) => { assert!(group_construct.is_none()); @@ -100,6 +104,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary group_destroy, group_meta_changes, group_table_change, + new_vnode_partition_count, } } @@ -620,6 +625,7 @@ pub trait HummockLevelsExt { delete_sst_levels: &[u32], delete_sst_ids_set: HashSet, ) -> bool; + fn can_partition_by_vnode(&self) -> bool; } impl HummockLevelsExt for Levels { @@ -651,6 +657,7 @@ impl HummockLevelsExt for Levels { insert_sst_level_id, insert_sub_level_id, insert_table_infos, + new_vnode_partition_count, .. } = summary; @@ -695,9 +702,29 @@ impl HummockLevelsExt for Levels { "should find the level to insert into when applying compaction generated delta. sub level idx: {}, removed sst ids: {:?}, sub levels: {:?},", insert_sub_level_id, delete_sst_ids_set, l0.sub_levels.iter().map(|level| level.sub_level_id).collect_vec() ); + if l0.sub_levels[index].table_infos.is_empty() + && self.member_table_ids.len() == 1 + && insert_table_infos.iter().all(|sst| { + sst.table_ids.len() == 1 && sst.table_ids[0] == self.member_table_ids[0] + }) + { + l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count; + } level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos); } else { let idx = insert_sst_level_id as usize - 1; + if self.levels[idx].table_infos.is_empty() + && insert_table_infos + .iter() + .all(|sst| sst.table_ids.len() == 1) + { + self.levels[idx].vnode_partition_count = new_vnode_partition_count; + } else if self.levels[idx].vnode_partition_count != 0 + && new_vnode_partition_count == 0 + && self.member_table_ids.len() > 1 + { + self.levels[idx].vnode_partition_count = 0; + } level_insert_ssts(&mut self.levels[idx], insert_table_infos); } } @@ -747,6 +774,10 @@ impl HummockLevelsExt for Levels { } delete_sst_ids_set.is_empty() } + + fn can_partition_by_vnode(&self) -> bool { + self.vnode_partition_count > 0 && self.member_table_ids.len() == 1 + } } pub fn build_initial_compaction_group_levels( @@ -762,6 +793,7 @@ pub fn build_initial_compaction_group_levels( total_file_size: 0, sub_level_id: 0, uncompressed_file_size: 0, + vnode_partition_count: 0, }); } Levels { @@ -774,6 +806,7 @@ pub fn build_initial_compaction_group_levels( group_id, parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _, member_table_ids: vec![], + vnode_partition_count: compaction_config.split_weight_by_vnode, } } @@ -912,6 +945,7 @@ pub fn new_sub_level( total_file_size, sub_level_id, uncompressed_file_size, + vnode_partition_count: 0, } }