diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index d938bc419300b..781b4d2c13b1b 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -265,7 +265,7 @@ impl LevelCompactionPicker { // reduce log if log_counter % 100 == 0 { - tracing::warn!("skip task with level count: {}, file count: {}, select size: {}, target size: {}, target level size: {}", + tracing::info!("skip task with level count: {}, file count: {}, select size: {}, target size: {}, target level size: {}", result.input_levels.len(), result.total_file_count, result.select_input_size, diff --git a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs index 6bdd6f5ab8eab..ab154f4f839e9 100644 --- a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs @@ -368,7 +368,6 @@ impl WholeLevelCompactionPicker { stats, ) { - tracing::warn!("==========pick_whole_level============"); return Some(result); } } diff --git a/src/meta/src/hummock/compaction/selector/level_selector.rs b/src/meta/src/hummock/compaction/selector/level_selector.rs index 535e4bfa9b7e6..cb508abadbdb2 100644 --- a/src/meta/src/hummock/compaction/selector/level_selector.rs +++ b/src/meta/src/hummock/compaction/selector/level_selector.rs @@ -33,8 +33,8 @@ use super::{ }; use crate::hummock::compaction::overlap_strategy::OverlapStrategy; use crate::hummock::compaction::picker::{ - CompactionPicker, CompactionTaskValidator, IntraCompactionPicker, LocalPickerStatistic, - MinOverlappingPicker, + CompactionInput, CompactionPicker, CompactionTaskValidator, IntraCompactionPicker, + LocalPickerStatistic, MinOverlappingPicker, }; use crate::hummock::compaction::{ create_overlap_strategy, CompactionDeveloperConfig, CompactionTask, LocalSelectorStatistic, @@ -456,7 +456,16 @@ impl DynamicLevelSelector { vnode_partition_count: 0, }); new_level.table_infos.extend(vlevel.table_infos); - new_level.total_file_size += vlevel.total_file_size; + new_level.total_file_size = new_level + .table_infos + .iter() + .map(|sst| sst.file_size) + .sum::(); + new_level.table_infos.sort_by(|sst1, sst2| { + let a = sst1.key_range.as_ref().unwrap(); + let b = sst2.key_range.as_ref().unwrap(); + a.compare(b) + }); } if let Some(group) = virtual_group.remove(table_id) { let hybrid_group = @@ -474,10 +483,9 @@ impl DynamicLevelSelector { if l0.sub_levels[idx].sub_level_id == l0.sub_levels[idx - 1].sub_level_id { - let x = std::mem::take(&mut l0.sub_levels[idx].table_infos); + let x = + std::mem::take(&mut l0.sub_levels[idx].table_infos); l0.sub_levels[idx - 1].table_infos.extend(x); - l0.sub_levels[idx - 1].total_file_size += - l0.sub_levels[idx].total_file_size; l0.sub_levels[idx - 1].table_infos.sort_by( |sst1, sst2| { let a = sst1.key_range.as_ref().unwrap(); @@ -485,6 +493,12 @@ impl DynamicLevelSelector { a.compare(b) }, ); + l0.sub_levels[idx - 1].total_file_size = l0.sub_levels + [idx - 1] + .table_infos + .iter() + .map(|sst| sst.file_size) + .sum::(); l0.sub_levels.remove(idx); } else { idx += 1; @@ -518,6 +532,13 @@ impl DynamicLevelSelector { compaction_group.compaction_config.as_ref(), ) }); + if table_id == 0 { + for sst in &vlevel.table_infos { + group.member_table_ids.extend(sst.table_ids.clone()); + } + } else { + group.member_table_ids.push(table_id); + } group.l0.as_mut().unwrap().sub_levels.push(vlevel); } } @@ -525,19 +546,19 @@ impl DynamicLevelSelector { for level in &levels.levels { let mut virtual_level: HashMap = HashMap::default(); for sst in &level.table_infos { - let table_id = - if sst.table_ids.len() > 1 || hybrid_table_ids.contains(&sst.table_ids[0]) { - if sst.table_ids.len() > 1 { - for table_id in &sst.table_ids { - if virtual_group.contains_key(table_id) { - return None; - } - } + let table_id = if sst.table_ids.len() > 1 { + for table_id in &sst.table_ids { + if virtual_group.contains_key(table_id) { + return None; } - 0 - } else { - sst.table_ids[0] - }; + hybrid_table_ids.insert(*table_id); + } + 0 + } else if hybrid_table_ids.contains(&sst.table_ids[0]) { + 0 + } else { + sst.table_ids[0] + }; let new_level = virtual_level.entry(table_id).or_insert_with(|| Level { level_idx: level.level_idx, level_type: level.level_type, @@ -553,6 +574,13 @@ impl DynamicLevelSelector { } for (table_id, vlevel) in virtual_level { if let Some(group) = virtual_group.get_mut(&table_id) { + if table_id == 0 { + for sst in &vlevel.table_infos { + group.member_table_ids.extend(sst.table_ids.clone()); + } + } else { + group.member_table_ids.push(table_id); + } group.levels[(level.level_idx as usize) - 1] = vlevel; } } @@ -568,7 +596,11 @@ impl DynamicLevelSelector { compaction_group.compaction_config.clone(), )); let mut score_levels = vec![]; - for (table_id, group) in &virtual_group { + for (table_id, group) in &mut virtual_group { + group.member_table_ids.sort(); + group.member_table_ids.dedup(); + assert!(!group.member_table_ids.is_empty()); + selector_stats.record_virtual_group_info(group.member_table_ids.clone()); let ctx = dynamic_level_core.get_priority_levels(group, level_handlers); for picker_info in ctx.score_levels { score_levels.push((*table_id, picker_info, ctx.base_level)); @@ -593,7 +625,11 @@ impl DynamicLevelSelector { let mut stats = LocalPickerStatistic::default(); if let Some(ret) = picker.pick_compaction(&group, level_handlers, &mut stats) { ret.add_pending_task(task_id, level_handlers); - tracing::warn!("NEW COMPACT task-{} to target level-{}", task_id, picker_info.target_level); + selector_stats.record_virtual_group_task( + table_id, + ret.input_levels[0].level_idx as usize, + ret.target_level, + ); return Some(create_compaction_task( dynamic_level_core.get_config(), ret, @@ -611,6 +647,12 @@ impl DynamicLevelSelector { } } +pub fn is_trivial_move_task(task: &CompactionInput) -> bool { + task.input_levels.len() == 2 + && task.input_levels[1].level_idx as usize == task.target_level + && task.input_levels[1].table_infos.is_empty() +} + impl CompactionSelector for DynamicLevelSelector { fn pick_compaction( &mut self, @@ -622,19 +664,6 @@ impl CompactionSelector for DynamicLevelSelector { _table_id_to_options: HashMap, developer_config: Arc, ) -> Option { - if levels.member_table_ids.len() > 1 { - if let Some(ret) = self.pick_compaction_per_table( - task_id, - compaction_group, - levels, - level_handlers, - selector_stats, - developer_config.clone(), - ) { - return Some(ret); - } - } - let dynamic_level_core = DynamicLevelSelectorCore::new( compaction_group.compaction_config.clone(), developer_config.clone(), @@ -646,6 +675,39 @@ impl CompactionSelector for DynamicLevelSelector { let compaction_task_validator = Arc::new(CompactionTaskValidator::new( compaction_group.compaction_config.clone(), )); + if levels.member_table_ids.len() > 1 && !ctx.score_levels.is_empty() { + if ctx.score_levels[0].score > SCORE_BASE { + let mut picker = dynamic_level_core.create_compaction_picker( + &ctx.score_levels[0], + overlap_strategy.clone(), + compaction_task_validator.clone(), + ); + let mut stats = LocalPickerStatistic::default(); + if let Some(ret) = picker.pick_compaction(levels, level_handlers, &mut stats) + && (ctx.score_levels[0].picker_type == PickerType::Tier + || is_trivial_move_task(&ret)) + { + ret.add_pending_task(task_id, level_handlers); + return Some(create_compaction_task( + dynamic_level_core.get_config(), + ret, + ctx.base_level, + self.task_type(), + )); + } + } + + if let Some(ret) = self.pick_compaction_per_table( + task_id, + compaction_group, + levels, + level_handlers, + selector_stats, + developer_config.clone(), + ) { + return Some(ret); + } + } for picker_info in &ctx.score_levels { if picker_info.score <= SCORE_BASE { return None; diff --git a/src/meta/src/hummock/compaction/selector/mod.rs b/src/meta/src/hummock/compaction/selector/mod.rs index a342a661ecb7b..d7660640484fb 100644 --- a/src/meta/src/hummock/compaction/selector/mod.rs +++ b/src/meta/src/hummock/compaction/selector/mod.rs @@ -24,6 +24,7 @@ mod space_reclaim_selector; mod tombstone_compaction_selector; mod ttl_selector; +use std::cell::RefCell; use std::collections::HashMap; use std::sync::Arc; @@ -70,13 +71,91 @@ pub fn default_compaction_selector() -> Box { Box::::default() } +struct TableVirtualGroup { + table_id: u32, + select_level: usize, + target_level: usize, +} + +#[derive(Default)] +struct LocalVirtualGroupInfo { + groups: Vec>, + count: usize, +} +#[derive(Default)] +struct LocalVirtualGroupStatistic { + groups: Vec, +} + +impl LocalVirtualGroupStatistic { + pub fn may_log_info(&mut self, mut groups: Vec>) { + for group in groups.iter_mut() { + group.sort(); + } + groups.sort_by_key(|group| *group.first().unwrap()); + for group in &mut self.groups { + if group.groups == groups { + group.count += 1; + if group.count >= 100 { + tracing::info!("group state table {:?}", group.groups); + group.count = 0; + } + return; + } + } + self.groups.push(LocalVirtualGroupInfo { + groups, + count: 1, + }); + } +} + +std::thread_local!(static LOG_COUNTER: RefCell = RefCell::new(LocalVirtualGroupStatistic::default())); + #[derive(Default)] pub struct LocalSelectorStatistic { skip_picker: Vec<(usize, usize, LocalPickerStatistic)>, + table_virtual_group: Option, + virtual_group_score_statistic: Vec>, } impl LocalSelectorStatistic { + pub fn record_virtual_group_info(&mut self, table_ids: Vec) { + self.virtual_group_score_statistic.push(table_ids); + } + + pub fn record_virtual_group_task( + &mut self, + table_id: u32, + select_level: usize, + target_level: usize, + ) { + self.table_virtual_group = Some(TableVirtualGroup { + table_id, + select_level, + target_level, + }); + } + pub fn report_to_metrics(&self, group_id: u64, metrics: &MetaMetrics) { + if self.virtual_group_score_statistic.is_empty() { + let x = self.virtual_group_score_statistic.clone(); + LOG_COUNTER.with_borrow_mut(|info| { + info.may_log_info(x); + }); + } + if let Some(group) = self.table_virtual_group.as_ref() { + let level_label = format!("{}-to-{}", group.select_level, group.target_level); + metrics + .compact_frequency + .with_label_values(&[ + "Virtual", + &group_id.to_string(), + &group.table_id.to_string(), + level_label.as_str(), + ]) + .inc(); + } for (start_level, target_level, stats) in &self.skip_picker { let level_label = format!("cg{}-{}-to-{}", group_id, start_level, target_level); if stats.skip_by_write_amp_limit > 0 {