From d4bf5154a7e74364852918141896b287a7fae648 Mon Sep 17 00:00:00 2001 From: Wallace Date: Wed, 10 Jan 2024 16:40:23 +0800 Subject: [PATCH] fix(meta): do not split by vnode for low write throughput (#12534) Signed-off-by: Little-Wallace --- proto/hummock.proto | 5 +- src/common/src/config.rs | 2 +- src/config/example.toml | 2 +- src/meta/src/hummock/compaction/mod.rs | 50 +------ .../picker/base_level_compaction_picker.rs | 31 ++++- .../picker/compaction_task_validator.rs | 1 - .../picker/intra_compaction_picker.rs | 131 ++++++++++++++++-- .../picker/manual_compaction_picker.rs | 29 ++-- .../picker/min_overlap_compaction_picker.rs | 51 +++---- src/meta/src/hummock/compaction/picker/mod.rs | 16 +++ .../picker/space_reclaim_compaction_picker.rs | 4 +- .../picker/tier_compaction_picker.rs | 9 +- .../picker/ttl_reclaim_compaction_picker.rs | 1 + .../compaction/selector/level_selector.rs | 81 ++++++----- .../src/hummock/compaction/selector/mod.rs | 6 + src/meta/src/hummock/manager/mod.rs | 77 +++++++--- src/meta/src/hummock/metrics_utils.rs | 31 ++++- .../compaction_group/hummock_version_ext.rs | 30 ++++ .../hummock_test/src/compactor_tests.rs | 2 +- 19 files changed, 377 insertions(+), 182 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 63e85eed015aa..630120370e3e7 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -49,6 +49,7 @@ message Level { uint64 total_file_size = 4; uint64 sub_level_id = 5; uint64 uncompressed_file_size = 6; + uint32 vnode_partition_count = 7; } message InputLevel { @@ -62,6 +63,7 @@ message IntraLevelDelta { uint64 l0_sub_level_id = 2; repeated uint64 removed_table_ids = 3; repeated SstableInfo inserted_table_infos = 4; + uint32 vnode_partition_count = 5; } enum CompatibilityVersion { @@ -142,6 +144,7 @@ message HummockVersion { uint64 group_id = 3; uint64 parent_group_id = 4; repeated uint32 member_table_ids = 5; + uint32 vnode_partition_count = 6; } uint64 id = 1; // Levels of each compaction group @@ -341,7 +344,7 @@ message CompactTask { bool split_by_state_table = 21 [deprecated = true]; // Compaction needs to cut the state table every time 1/weight of vnodes in the table have been processed. // Deprecated. use table_vnode_partition instead; - uint32 split_weight_by_vnode = 22 [deprecated = true]; + uint32 split_weight_by_vnode = 22; map table_vnode_partition = 23; // The table watermark of any table id. In compaction we only use the table watermarks on safe epoch, // so we only need to include the table watermarks on safe epoch to reduce the size of metadata. diff --git a/src/common/src/config.rs b/src/common/src/config.rs index f9aa4bbcef0c4..62bc1d726a32a 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1016,7 +1016,7 @@ pub mod default { } pub fn partition_vnode_count() -> u32 { - 64 + 16 } pub fn table_write_throughput_threshold() -> u64 { diff --git a/src/config/example.toml b/src/config/example.toml index 8cba1d5f60d81..ed208e9b479b1 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -40,7 +40,7 @@ move_table_size_limit = 10737418240 split_group_size_limit = 68719476736 cut_table_size_limit = 1073741824 do_not_config_object_storage_lifecycle = false -partition_vnode_count = 64 +partition_vnode_count = 16 table_write_throughput_threshold = 16777216 min_table_split_write_throughput = 4194304 compaction_task_max_heartbeat_interval_secs = 60 diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index 4cd913fa7d7fa..562027b91c8da 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -17,23 +17,20 @@ pub mod compaction_config; mod overlap_strategy; use risingwave_common::catalog::TableOption; -use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; -use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType}; +use risingwave_pb::hummock::compact_task::{self, TaskType}; mod picker; pub mod selector; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; use picker::{LevelCompactionPicker, TierCompactionPicker}; -use risingwave_hummock_sdk::{ - can_concat, CompactionGroupId, HummockCompactionTaskId, HummockEpoch, -}; +use risingwave_hummock_sdk::{can_concat, CompactionGroupId, HummockCompactionTaskId}; use risingwave_pb::hummock::compaction_config::CompactionMode; use risingwave_pb::hummock::hummock_version::Levels; -use risingwave_pb::hummock::{CompactTask, CompactionConfig, KeyRange, LevelType}; +use risingwave_pb::hummock::{CompactTask, CompactionConfig, LevelType}; pub use selector::CompactionSelector; use self::selector::LocalSelectorStatistic; @@ -107,51 +104,18 @@ impl CompactStatus { stats: &mut LocalSelectorStatistic, selector: &mut Box, table_id_to_options: HashMap, - ) -> Option { + ) -> Option { // When we compact the files, we must make the result of compaction meet the following // conditions, for any user key, the epoch of it in the file existing in the lower // layer must be larger. - let ret = selector.pick_compaction( + selector.pick_compaction( task_id, group, levels, &mut self.level_handlers, stats, table_id_to_options, - )?; - let target_level_id = ret.input.target_level; - - let compression_algorithm = match ret.compression_algorithm.as_str() { - "Lz4" => 1, - "Zstd" => 2, - _ => 0, - }; - - let compact_task = CompactTask { - input_ssts: ret.input.input_levels, - splits: vec![KeyRange::inf()], - watermark: HummockEpoch::MAX, - sorted_output_ssts: vec![], - task_id, - target_level: target_level_id as u32, - // only gc delete keys in last level because there may be older version in more bottom - // level. - gc_delete_keys: target_level_id == self.level_handlers.len() - 1, - base_level: ret.base_level as u32, - task_status: TaskStatus::Pending as i32, - compaction_group_id: group.group_id, - existing_table_ids: vec![], - compression_algorithm, - target_file_size: ret.target_file_size, - compaction_filter_mask: 0, - table_options: BTreeMap::default(), - current_epoch_time: 0, - target_sub_level_id: ret.input.target_sub_level_id, - task_type: ret.compaction_task_type as i32, - table_vnode_partition: BTreeMap::default(), - ..Default::default() - }; - Some(compact_task) + ) } pub fn is_trivial_move_task(task: &CompactTask) -> bool { diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index 2005d7ec42948..c5c09f1544ce7 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -60,12 +60,13 @@ impl CompactionPicker for LevelCompactionPicker { return None; } - if let Some(ret) = self.pick_base_trivial_move( + if let Some(mut ret) = self.pick_base_trivial_move( l0, levels.get_level(self.target_level), level_handlers, stats, ) { + ret.vnode_partition_count = levels.vnode_partition_count; return Some(ret); } @@ -73,6 +74,7 @@ impl CompactionPicker for LevelCompactionPicker { if let Some(ret) = self.pick_multi_level_to_base( l0, levels.get_level(self.target_level), + levels.vnode_partition_count, level_handlers, stats, ) { @@ -128,6 +130,7 @@ impl LevelCompactionPicker { &self, l0: &OverlappingLevel, target_level: &Level, + vnode_partition_count: u32, level_handlers: &[LevelHandler], stats: &mut LocalPickerStatistic, ) -> Option { @@ -147,8 +150,18 @@ impl LevelCompactionPicker { overlap_strategy.clone(), ); - let l0_select_tables_vec = non_overlap_sub_level_picker - .pick_l0_multi_non_overlap_level(&l0.sub_levels, &level_handlers[0]); + let mut max_vnode_partition_idx = 0; + for (idx, level) in l0.sub_levels.iter().enumerate() { + if level.vnode_partition_count < vnode_partition_count { + break; + } + max_vnode_partition_idx = idx; + } + + let l0_select_tables_vec = non_overlap_sub_level_picker.pick_l0_multi_non_overlap_level( + &l0.sub_levels[..=max_vnode_partition_idx], + &level_handlers[0], + ); if l0_select_tables_vec.is_empty() { stats.skip_by_pending_files += 1; return None; @@ -217,6 +230,7 @@ impl LevelCompactionPicker { select_input_size: input.total_file_size, target_input_size: target_file_size, total_file_count: (input.total_file_count + target_file_count) as u64, + vnode_partition_count, ..Default::default() }; @@ -225,6 +239,15 @@ impl LevelCompactionPicker { ValidationRuleType::ToBase, stats, ) { + if l0.total_file_size > target_level.total_file_size * 8 { + tracing::warn!("skip task with level count: {}, file count: {}, select size: {}, target size: {}, target level size: {}", + result.input_levels.len(), + result.total_file_count, + result.select_input_size, + result.target_input_size, + target_level.total_file_size, + ); + } continue; } @@ -423,6 +446,7 @@ pub mod tests { total_file_size: 0, sub_level_id: 0, uncompressed_file_size: 0, + ..Default::default() }]; let mut levels = Levels { levels, @@ -487,6 +511,7 @@ pub mod tests { total_file_size: 900, sub_level_id: 0, uncompressed_file_size: 900, + ..Default::default() }], l0: Some(generate_l0_nonoverlapping_sublevels(vec![])), ..Default::default() diff --git a/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs b/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs index dee2dd8e71aa2..860d211239bd5 100644 --- a/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs +++ b/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs @@ -118,7 +118,6 @@ impl CompactionTaskValidationRule for TierCompactionTaskValidationRule { stats.skip_by_count_limit += 1; return false; } - true } } diff --git a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs index 5e3878b2fa9d0..034db2b127963 100644 --- a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs @@ -57,7 +57,16 @@ impl CompactionPicker for IntraCompactionPicker { return None; } - if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], stats) { + let vnode_partition_count = levels.vnode_partition_count; + + if let Some(ret) = + self.pick_whole_level(l0, &level_handlers[0], vnode_partition_count, stats) + { + return Some(ret); + } + + if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], vnode_partition_count, stats) + { return Some(ret); } @@ -84,13 +93,102 @@ impl IntraCompactionPicker { } } + fn pick_whole_level( + &self, + l0: &OverlappingLevel, + level_handler: &LevelHandler, + partition_count: u32, + stats: &mut LocalPickerStatistic, + ) -> Option { + if partition_count == 0 { + return None; + } + for (idx, level) in l0.sub_levels.iter().enumerate() { + if level.level_type() != LevelType::Nonoverlapping + || level.vnode_partition_count == partition_count + { + continue; + } + + let max_compaction_bytes = std::cmp::max( + self.config.max_bytes_for_level_base, + self.config.sub_level_max_compaction_bytes + * (self.config.level0_sub_level_compact_level_count as u64), + ); + + let mut select_input_size = 0; + + let mut select_level_inputs = vec![]; + let mut total_file_count = 0; + let mut wait_enough = false; + for next_level in l0.sub_levels.iter().skip(idx) { + if select_input_size > max_compaction_bytes + || total_file_count > self.config.level0_max_compact_file_number + || (next_level.vnode_partition_count == partition_count + && select_level_inputs.len() > 1) + { + wait_enough = true; + break; + } + + if level_handler.is_level_pending_compact(next_level) { + break; + } + + select_input_size += next_level.total_file_size; + total_file_count += next_level.table_infos.len() as u64; + + select_level_inputs.push(InputLevel { + level_idx: 0, + level_type: next_level.level_type, + table_infos: next_level.table_infos.clone(), + }); + } + if !select_level_inputs.is_empty() { + let vnode_partition_count = + if select_input_size > self.config.sub_level_max_compaction_bytes / 2 { + partition_count + } else { + 0 + }; + let result = CompactionInput { + input_levels: select_level_inputs, + target_sub_level_id: level.sub_level_id, + select_input_size, + total_file_count, + vnode_partition_count, + ..Default::default() + }; + if wait_enough + || self.compaction_task_validator.valid_compact_task( + &result, + ValidationRuleType::Intra, + stats, + ) + { + return Some(result); + } + } + } + + None + } + fn pick_l0_intra( &self, l0: &OverlappingLevel, level_handler: &LevelHandler, + vnode_partition_count: u32, stats: &mut LocalPickerStatistic, ) -> Option { let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); + let mut max_vnode_partition_idx = 0; + for (idx, level) in l0.sub_levels.iter().enumerate() { + if level.vnode_partition_count < vnode_partition_count { + break; + } + max_vnode_partition_idx = idx; + } for (idx, level) in l0.sub_levels.iter().enumerate() { if level.level_type() != LevelType::Nonoverlapping @@ -99,6 +197,10 @@ impl IntraCompactionPicker { continue; } + if idx > max_vnode_partition_idx { + break; + } + if level_handler.is_level_all_pending_compact(level) { continue; } @@ -117,7 +219,10 @@ impl IntraCompactionPicker { ); let l0_select_tables_vec = non_overlap_sub_level_picker - .pick_l0_multi_non_overlap_level(&l0.sub_levels[idx..], level_handler); + .pick_l0_multi_non_overlap_level( + &l0.sub_levels[idx..=max_vnode_partition_idx], + level_handler, + ); if l0_select_tables_vec.is_empty() { continue; @@ -192,6 +297,12 @@ impl IntraCompactionPicker { continue; } + if l0.sub_levels[idx + 1].vnode_partition_count + != l0.sub_levels[idx].vnode_partition_count + { + continue; + } + let trivial_move_picker = TrivialMovePicker::new(0, 0, overlap_strategy.clone()); let select_sst = trivial_move_picker.pick_trivial_move_sst( @@ -281,14 +392,11 @@ pub mod tests { fn test_l0_to_l1_compact_conflict() { // When picking L0->L1, L0's selecting_key_range should not be overlapped with L0's // compacting_key_range. - let mut picker = create_compaction_picker_for_test(); let levels = vec![Level { level_idx: 1, level_type: LevelType::Nonoverlapping as i32, table_infos: vec![], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }]; let mut levels = Levels { levels, @@ -307,14 +415,9 @@ pub mod tests { generate_table(2, 1, 350, 500, 2), ], ); - let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; + let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; let mut local_stats = LocalPickerStatistic::default(); - let ret = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); - // trivial_move - ret.add_pending_task(0, &mut levels_handler); // pending only for test push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(3, 1, 250, 300, 3)]); let config: CompactionConfig = CompactionConfigBuilder::new() .level0_tier_compact_file_number(2) @@ -341,9 +444,7 @@ pub mod tests { level_idx: 1, level_type: LevelType::Nonoverlapping as i32, table_infos: vec![generate_table(3, 1, 200, 300, 2)], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }], l0: Some(generate_l0_nonoverlapping_sublevels(vec![ generate_table(1, 1, 100, 210, 2), diff --git a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs index b0d1934823512..b5f187a3253d0 100644 --- a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs @@ -384,9 +384,7 @@ pub mod tests { generate_table(1, 1, 101, 200, 1), generate_table(2, 1, 222, 300, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, Level { level_idx: 2, @@ -398,9 +396,7 @@ pub mod tests { generate_table(7, 1, 501, 800, 1), generate_table(8, 2, 301, 400, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; let mut levels = Levels { @@ -562,9 +558,7 @@ pub mod tests { generate_table(3, 1, 0, 100, 1), generate_table(4, 2, 2000, 3000, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, Level { level_idx: 2, @@ -573,9 +567,7 @@ pub mod tests { generate_table(1, 1, 0, 100, 1), generate_table(2, 2, 2000, 3000, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; // Set internal_table_ids. @@ -617,9 +609,7 @@ pub mod tests { generate_table(3, 2, 200, 300, 1), generate_table(4, 2, 300, 400, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }]; let levels = Levels { levels, @@ -641,6 +631,7 @@ pub mod tests { total_file_size: 0, sub_level_id: 0, uncompressed_file_size: 0, + ..Default::default() }]; let levels = Levels { levels, @@ -1177,9 +1168,7 @@ pub mod tests { generate_table(3, 1, 101, 200, 1), generate_table(4, 1, 222, 300, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; assert_eq!(levels.len(), 4); @@ -1287,9 +1276,7 @@ pub mod tests { generate_table(6, 1, 444, 500, 1), generate_table(7, 1, 555, 600, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; assert_eq!(levels.len(), 4); diff --git a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs index ff9a6cda8dc3b..3065d108e700f 100644 --- a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs @@ -163,6 +163,7 @@ impl CompactionPicker for MinOverlappingPicker { }, ], target_level: self.target_level, + vnode_partition_count: levels.vnode_partition_count, ..Default::default() }) } @@ -434,10 +435,7 @@ pub mod tests { generate_table(1, 1, 101, 200, 1), generate_table(2, 1, 222, 300, 1), ], - - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, Level { level_idx: 2, @@ -449,9 +447,7 @@ pub mod tests { generate_table(7, 1, 501, 800, 1), generate_table(8, 2, 301, 400, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; let levels = Levels { @@ -511,9 +507,7 @@ pub mod tests { generate_table(1, 1, 100, 149, 2), generate_table(2, 1, 150, 249, 2), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, Level { level_idx: 2, @@ -522,9 +516,7 @@ pub mod tests { generate_table(4, 1, 50, 199, 1), generate_table(5, 1, 200, 399, 1), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; let levels = Levels { @@ -573,8 +565,7 @@ pub mod tests { generate_table(8, 1, 450, 500, 2), ], total_file_size: 800, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, Level { level_idx: 2, @@ -587,8 +578,7 @@ pub mod tests { generate_table(11, 1, 450, 500, 2), ], total_file_size: 250, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, Level { level_idx: 3, @@ -599,8 +589,7 @@ pub mod tests { generate_table(13, 1, 450, 500, 2), ], total_file_size: 150, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, Level { level_idx: 4, @@ -611,8 +600,7 @@ pub mod tests { generate_table(16, 1, 450, 500, 2), ], total_file_size: 150, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; @@ -677,8 +665,7 @@ pub mod tests { generate_table(8, 1, 450, 500, 2), ], total_file_size: 800, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, Level { level_idx: 2, @@ -691,8 +678,7 @@ pub mod tests { generate_table(11, 1, 450, 500, 2), ], total_file_size: 250, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, Level { level_idx: 3, @@ -703,8 +689,7 @@ pub mod tests { generate_table(13, 1, 450, 500, 2), ], total_file_size: 150, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, Level { level_idx: 4, @@ -715,8 +700,7 @@ pub mod tests { generate_table(16, 1, 450, 500, 2), ], total_file_size: 150, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; @@ -806,8 +790,7 @@ pub mod tests { level_type: LevelType::Nonoverlapping as i32, table_infos: vec![generate_table(0, 1, 400, 500, 2)], total_file_size: 100, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, Level { level_idx: 2, @@ -817,8 +800,7 @@ pub mod tests { generate_table(2, 1, 600, 700, 1), ], total_file_size: 200, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, Level { level_idx: 3, @@ -828,8 +810,7 @@ pub mod tests { generate_table(4, 1, 600, 800, 1), ], total_file_size: 400, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; diff --git a/src/meta/src/hummock/compaction/picker/mod.rs b/src/meta/src/hummock/compaction/picker/mod.rs index 3c6b066e7f5e2..7e33123272684 100644 --- a/src/meta/src/hummock/compaction/picker/mod.rs +++ b/src/meta/src/hummock/compaction/picker/mod.rs @@ -61,6 +61,7 @@ pub struct CompactionInput { pub select_input_size: u64, pub target_input_size: u64, pub total_file_count: u64, + pub vnode_partition_count: u32, } impl CompactionInput { @@ -96,3 +97,18 @@ pub trait CompactionPicker { stats: &mut LocalPickerStatistic, ) -> Option; } + +#[derive(Default, Clone, Debug)] +pub struct PartitionLevelInfo { + pub level_id: u32, + pub sub_level_id: u64, + pub left_idx: usize, + pub right_idx: usize, + pub total_file_size: u64, +} + +#[derive(Default, Clone, Debug)] +pub struct LevelPartition { + pub sub_levels: Vec, + pub total_file_size: u64, +} diff --git a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs index 928b4d4d454c3..e14baaf086c1f 100644 --- a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs @@ -221,9 +221,7 @@ mod test { generate_table_with_ids_and_epochs(10, 1, 888, 1600, 1, vec![10], 0, 0), generate_table_with_ids_and_epochs(11, 1, 1600, 1800, 1, vec![10], 0, 0), ], - total_file_size: 0, - sub_level_id: 0, - uncompressed_file_size: 0, + ..Default::default() }, ]; diff --git a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs index b2a058659fea4..20b7d3b38c8e7 100644 --- a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs @@ -52,6 +52,7 @@ impl TierCompactionPicker { &self, l0: &OverlappingLevel, level_handler: &LevelHandler, + mut vnode_partition_count: u32, stats: &mut LocalPickerStatistic, ) -> Option { for (idx, level) in l0.sub_levels.iter().enumerate() { @@ -114,6 +115,9 @@ impl TierCompactionPicker { } select_level_inputs.reverse(); + if compaction_bytes < self.config.sub_level_max_compaction_bytes / 2 { + vnode_partition_count = 0; + } let result = CompactionInput { input_levels: select_level_inputs, @@ -122,6 +126,7 @@ impl TierCompactionPicker { select_input_size: compaction_bytes, target_input_size: 0, total_file_count: compact_file_count, + vnode_partition_count, }; if !self.compaction_task_validator.valid_compact_task( @@ -150,7 +155,7 @@ impl CompactionPicker for TierCompactionPicker { return None; } - self.pick_overlapping_level(l0, &level_handlers[0], stats) + self.pick_overlapping_level(l0, &level_handlers[0], levels.vnode_partition_count, stats) } } @@ -257,7 +262,7 @@ pub mod tests { // sub_level_max_compaction_bytes. let mut picker = TierCompactionPicker::new(config); let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); - assert!(ret.is_none()) + assert!(ret.is_none()); } #[test] diff --git a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs index 9d1e42893a3ae..4f7d4b2bd5da8 100644 --- a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs @@ -340,6 +340,7 @@ mod test { total_file_size: 0, sub_level_id: 0, uncompressed_file_size: 0, + ..Default::default() }, ]; diff --git a/src/meta/src/hummock/compaction/selector/level_selector.rs b/src/meta/src/hummock/compaction/selector/level_selector.rs index 736c10fac69c6..62a940958f76a 100644 --- a/src/meta/src/hummock/compaction/selector/level_selector.rs +++ b/src/meta/src/hummock/compaction/selector/level_selector.rs @@ -238,53 +238,62 @@ impl DynamicLevelSelectorCore { // range at each level, so the number of levels is the most important factor affecting // the read performance. At the same time, the size factor is also added to the score // calculation rule to avoid unbalanced compact task due to large size. - let non_overlapping_score = { - let total_size = levels.l0.as_ref().unwrap().total_file_size - - handlers[0].get_pending_output_file_size(ctx.base_level as u32); - let base_level_size = levels.get_level(ctx.base_level).total_file_size; - let base_level_sst_count = - levels.get_level(ctx.base_level).table_infos.len() as u64; - - // size limit - let non_overlapping_size_score = total_size * SCORE_BASE - / std::cmp::max(self.config.max_bytes_for_level_base, base_level_size); - // level count limit - let non_overlapping_level_count = levels - .l0 - .as_ref() - .unwrap() - .sub_levels - .iter() - .filter(|level| level.level_type() == LevelType::Nonoverlapping) - .count() as u64; - let non_overlapping_level_score = non_overlapping_level_count * SCORE_BASE - / std::cmp::max( - base_level_sst_count / 16, - self.config.level0_sub_level_compact_level_count as u64, - ); - - std::cmp::max(non_overlapping_size_score, non_overlapping_level_score) - }; + let total_size = levels + .l0 + .as_ref() + .unwrap() + .sub_levels + .iter() + .filter(|level| { + level.vnode_partition_count == levels.vnode_partition_count + && level.level_type() == LevelType::Nonoverlapping + }) + .map(|level| level.total_file_size) + .sum::() + - handlers[0].get_pending_output_file_size(ctx.base_level as u32); + let base_level_size = levels.get_level(ctx.base_level).total_file_size; + let base_level_sst_count = levels.get_level(ctx.base_level).table_infos.len() as u64; + + // size limit + let non_overlapping_size_score = total_size * SCORE_BASE + / std::cmp::max(self.config.max_bytes_for_level_base, base_level_size); + // level count limit + let non_overlapping_level_count = levels + .l0 + .as_ref() + .unwrap() + .sub_levels + .iter() + .filter(|level| level.level_type() == LevelType::Nonoverlapping) + .count() as u64; + let non_overlapping_level_score = non_overlapping_level_count * SCORE_BASE + / std::cmp::max( + base_level_sst_count / 16, + self.config.level0_sub_level_compact_level_count as u64, + ); + + let non_overlapping_score = + std::cmp::max(non_overlapping_size_score, non_overlapping_level_score); // Reduce the level num of l0 non-overlapping sub_level - ctx.score_levels.push({ - PickerInfo { + if non_overlapping_size_score > SCORE_BASE { + ctx.score_levels.push(PickerInfo { score: non_overlapping_score + 1, select_level: 0, target_level: ctx.base_level, picker_type: PickerType::ToBase, - } - }); + }); + } - // FIXME: more accurate score calculation algorithm will be introduced (#11903) - ctx.score_levels.push({ - PickerInfo { + if non_overlapping_level_score > SCORE_BASE { + // FIXME: more accurate score calculation algorithm will be introduced (#11903) + ctx.score_levels.push(PickerInfo { score: non_overlapping_score, select_level: 0, target_level: 0, picker_type: PickerType::Intra, - } - }); + }); + } } // The bottommost level can not be input level. diff --git a/src/meta/src/hummock/compaction/selector/mod.rs b/src/meta/src/hummock/compaction/selector/mod.rs index 52d373ccfe9ef..b6900bb5cbbe8 100644 --- a/src/meta/src/hummock/compaction/selector/mod.rs +++ b/src/meta/src/hummock/compaction/selector/mod.rs @@ -126,6 +126,7 @@ pub mod tests { uncompressed_file_size: sst.uncompressed_file_size, sub_level_id: sst.get_sst_id(), table_infos: vec![sst], + ..Default::default() }); } @@ -156,6 +157,7 @@ pub mod tests { sub_level_id, table_infos, uncompressed_file_size, + ..Default::default() }); } @@ -241,6 +243,7 @@ pub mod tests { total_file_size, sub_level_id: 0, uncompressed_file_size, + ..Default::default() } } @@ -263,6 +266,7 @@ pub mod tests { uncompressed_file_size: table.uncompressed_file_size, sub_level_id: idx as u64, table_infos: vec![table], + ..Default::default() }) .collect_vec(), total_file_size, @@ -287,6 +291,7 @@ pub mod tests { .sum::(), sub_level_id: idx as u64, table_infos: table, + ..Default::default() }) .collect_vec(), total_file_size: 0, @@ -321,6 +326,7 @@ pub mod tests { .iter() .map(|sst| sst.uncompressed_file_size) .sum::(), + ..Default::default() }) .collect_vec(), total_file_size: 0, diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index c3c6e82c145f9..104a3ceb9fe36 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -892,15 +892,61 @@ impl HummockManager { ); stats.report_to_metrics(compaction_group_id, self.metrics.as_ref()); selector_timer.observe_duration(); - let mut compact_task = match compact_task { + let compact_task = match compact_task { None => { return Ok(None); } Some(task) => task, }; - compact_task.watermark = watermark; - compact_task.existing_table_ids = member_table_ids.clone(); + let target_level_id = compact_task.input.target_level; + + let compression_algorithm = match compact_task.compression_algorithm.as_str() { + "Lz4" => 1, + "Zstd" => 2, + _ => 0, + }; + let vnode_partition_count = compact_task.input.vnode_partition_count; + use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; + + let mut compact_task = CompactTask { + input_ssts: compact_task.input.input_levels, + splits: vec![risingwave_pb::hummock::KeyRange::inf()], + watermark, + sorted_output_ssts: vec![], + task_id, + target_level: target_level_id as u32, + // only gc delete keys in last level because there may be older version in more bottom + // level. + gc_delete_keys: target_level_id + == current_version + .get_compaction_group_levels(compaction_group_id) + .levels + .len() + - 1, + base_level: compact_task.base_level as u32, + task_status: TaskStatus::Pending as i32, + compaction_group_id: group_config.group_id, + existing_table_ids: member_table_ids.clone(), + compression_algorithm, + target_file_size: compact_task.target_file_size, + table_options: table_id_to_option + .into_iter() + .filter_map(|(table_id, table_option)| { + if member_table_ids.contains(&table_id) { + return Some((table_id, TableOption::from(&table_option))); + } + + None + }) + .collect(), + current_epoch_time: Epoch::now().0, + compaction_filter_mask: group_config.compaction_config.compaction_filter_mask, + target_sub_level_id: compact_task.input.target_sub_level_id, + task_type: compact_task.compaction_task_type as i32, + split_weight_by_vnode: compact_task.input.vnode_partition_count, + ..Default::default() + }; let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task); let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task); @@ -950,21 +996,19 @@ impl HummockManager { compact_task.input_ssts ); } else { - compact_task.table_options = table_id_to_option - .into_iter() - .filter_map(|(table_id, table_option)| { - if compact_task.existing_table_ids.contains(&table_id) { - return Some((table_id, TableOption::from(&table_option))); - } - - None - }) - .collect(); - compact_task.current_epoch_time = Epoch::now().0; - compact_task.compaction_filter_mask = - group_config.compaction_config.compaction_filter_mask; table_to_vnode_partition .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id)); + if current_version + .get_compaction_group_levels(compaction_group_id) + .vnode_partition_count + > 0 + { + for table_id in &compact_task.existing_table_ids { + table_to_vnode_partition + .entry(*table_id) + .or_insert(vnode_partition_count); + } + } compact_task.table_vnode_partition = table_to_vnode_partition; compact_task.table_watermarks = @@ -3214,6 +3258,7 @@ fn gen_version_delta<'a>( level_idx: compact_task.target_level, inserted_table_infos: compact_task.sorted_output_ssts.clone(), l0_sub_level_id: compact_task.target_sub_level_id, + vnode_partition_count: compact_task.split_weight_by_vnode, ..Default::default() })), }; diff --git a/src/meta/src/hummock/metrics_utils.rs b/src/meta/src/hummock/metrics_utils.rs index 47c19d7eef1cd..7f648e6c1a0f6 100644 --- a/src/meta/src/hummock/metrics_utils.rs +++ b/src/meta/src/hummock/metrics_utils.rs @@ -198,9 +198,11 @@ pub fn trigger_sst_stat( { // sub level stat let overlapping_level_label = - build_compact_task_l0_stat_metrics_label(compaction_group_id, true); + build_compact_task_l0_stat_metrics_label(compaction_group_id, true, false); let non_overlap_level_label = - build_compact_task_l0_stat_metrics_label(compaction_group_id, false); + build_compact_task_l0_stat_metrics_label(compaction_group_id, false, false); + let partition_level_label = + build_compact_task_l0_stat_metrics_label(compaction_group_id, true, true); let overlapping_sst_num = current_version .levels @@ -228,6 +230,21 @@ pub fn trigger_sst_stat( }) .unwrap_or(0); + let partition_level_num = current_version + .levels + .get(&compaction_group_id) + .and_then(|level| { + level.l0.as_ref().map(|l0| { + l0.sub_levels + .iter() + .filter(|sub_level| { + sub_level.level_type() == LevelType::Nonoverlapping + && sub_level.vnode_partition_count > 0 + }) + .count() + }) + }) + .unwrap_or(0); metrics .level_sst_num .with_label_values(&[&overlapping_level_label]) @@ -237,6 +254,11 @@ pub fn trigger_sst_stat( .level_sst_num .with_label_values(&[&non_overlap_level_label]) .set(non_overlap_sst_num as i64); + + metrics + .level_sst_num + .with_label_values(&[&partition_level_label]) + .set(partition_level_num as i64); } let previous_time = metrics.time_after_last_observation.load(Ordering::Relaxed); @@ -555,8 +577,11 @@ pub fn build_compact_task_stat_metrics_label( pub fn build_compact_task_l0_stat_metrics_label( compaction_group_id: u64, overlapping: bool, + partition: bool, ) -> String { - if overlapping { + if partition { + format!("cg{}_l0_sub_partition", compaction_group_id) + } else if overlapping { format!("cg{}_l0_sub_overlapping", compaction_group_id) } else { format!("cg{}_l0_sub_non_overlap", compaction_group_id) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index c494381b4b9f8..064201eb2a7df 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -47,6 +47,7 @@ pub struct GroupDeltasSummary { pub group_destroy: Option, pub group_meta_changes: Vec, pub group_table_change: Option, + pub new_vnode_partition_count: u32, } pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary { @@ -59,6 +60,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary let mut group_destroy = None; let mut group_meta_changes = vec![]; let mut group_table_change = None; + let mut new_vnode_partition_count = 0; for group_delta in &group_deltas.group_deltas { match group_delta.get_delta_type().unwrap() { @@ -72,6 +74,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary insert_sub_level_id = intra_level.l0_sub_level_id; insert_table_infos.extend(intra_level.inserted_table_infos.iter().cloned()); } + new_vnode_partition_count = intra_level.vnode_partition_count; } DeltaType::GroupConstruct(construct_delta) => { assert!(group_construct.is_none()); @@ -103,6 +106,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary group_destroy, group_meta_changes, group_table_change, + new_vnode_partition_count, } } @@ -680,6 +684,7 @@ impl Levels { insert_sst_level_id, insert_sub_level_id, insert_table_infos, + new_vnode_partition_count, .. } = summary; @@ -724,9 +729,31 @@ impl Levels { "should find the level to insert into when applying compaction generated delta. sub level idx: {}, removed sst ids: {:?}, sub levels: {:?},", insert_sub_level_id, delete_sst_ids_set, l0.sub_levels.iter().map(|level| level.sub_level_id).collect_vec() ); + if l0.sub_levels[index].table_infos.is_empty() + && self.member_table_ids.len() == 1 + && insert_table_infos.iter().all(|sst| { + sst.table_ids.len() == 1 && sst.table_ids[0] == self.member_table_ids[0] + }) + { + // Only change vnode_partition_count for group which has only one state-table. + // Only change vnode_partition_count for level which update all sst files in this compact task. + l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count; + } level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos); } else { let idx = insert_sst_level_id as usize - 1; + if self.levels[idx].table_infos.is_empty() + && insert_table_infos + .iter() + .all(|sst| sst.table_ids.len() == 1) + { + self.levels[idx].vnode_partition_count = new_vnode_partition_count; + } else if self.levels[idx].vnode_partition_count != 0 + && new_vnode_partition_count == 0 + && self.member_table_ids.len() > 1 + { + self.levels[idx].vnode_partition_count = 0; + } level_insert_ssts(&mut self.levels[idx], insert_table_infos); } } @@ -791,6 +818,7 @@ pub fn build_initial_compaction_group_levels( total_file_size: 0, sub_level_id: 0, uncompressed_file_size: 0, + vnode_partition_count: 0, }); } Levels { @@ -803,6 +831,7 @@ pub fn build_initial_compaction_group_levels( group_id, parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _, member_table_ids: vec![], + vnode_partition_count: compaction_config.split_weight_by_vnode, } } @@ -941,6 +970,7 @@ pub fn new_sub_level( total_file_size, sub_level_id, uncompressed_file_size, + vnode_partition_count: 0, } } diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index a80d4e3dac568..ca7f36c53a6ae 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -1354,7 +1354,7 @@ pub(crate) mod tests { .last() .unwrap(); assert_eq!(1, output_level_info.table_infos.len()); - assert_eq!(252, output_level_info.table_infos[0].total_key_count); + assert_eq!(254, output_level_info.table_infos[0].total_key_count); } type KeyValue = (FullKey>, HummockValue>);