Skip to content

Commit

Permalink
add some metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed May 30, 2024
1 parent ce4130a commit aa9b120
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ impl LevelCompactionPicker {

// reduce log
if log_counter % 100 == 0 {
tracing::warn!("skip task with level count: {}, file count: {}, select size: {}, target size: {}, target level size: {}",
tracing::info!("skip task with level count: {}, file count: {}, select size: {}, target size: {}, target level size: {}",
result.input_levels.len(),
result.total_file_count,
result.select_input_size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,6 @@ impl WholeLevelCompactionPicker {
stats,
)
{
tracing::warn!("==========pick_whole_level============");
return Some(result);
}
}
Expand Down
128 changes: 95 additions & 33 deletions src/meta/src/hummock/compaction/selector/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use super::{
};
use crate::hummock::compaction::overlap_strategy::OverlapStrategy;
use crate::hummock::compaction::picker::{
CompactionPicker, CompactionTaskValidator, IntraCompactionPicker, LocalPickerStatistic,
MinOverlappingPicker,
CompactionInput, CompactionPicker, CompactionTaskValidator, IntraCompactionPicker,
LocalPickerStatistic, MinOverlappingPicker,
};
use crate::hummock::compaction::{
create_overlap_strategy, CompactionDeveloperConfig, CompactionTask, LocalSelectorStatistic,
Expand Down Expand Up @@ -456,7 +456,16 @@ impl DynamicLevelSelector {
vnode_partition_count: 0,
});
new_level.table_infos.extend(vlevel.table_infos);
new_level.total_file_size += vlevel.total_file_size;
new_level.total_file_size = new_level
.table_infos
.iter()
.map(|sst| sst.file_size)
.sum::<u64>();
new_level.table_infos.sort_by(|sst1, sst2| {
let a = sst1.key_range.as_ref().unwrap();
let b = sst2.key_range.as_ref().unwrap();
a.compare(b)
});
}
if let Some(group) = virtual_group.remove(table_id) {
let hybrid_group =
Expand All @@ -474,17 +483,22 @@ impl DynamicLevelSelector {
if l0.sub_levels[idx].sub_level_id
== l0.sub_levels[idx - 1].sub_level_id
{
let x = std::mem::take(&mut l0.sub_levels[idx].table_infos);
let x =
std::mem::take(&mut l0.sub_levels[idx].table_infos);
l0.sub_levels[idx - 1].table_infos.extend(x);
l0.sub_levels[idx - 1].total_file_size +=
l0.sub_levels[idx].total_file_size;
l0.sub_levels[idx - 1].table_infos.sort_by(
|sst1, sst2| {
let a = sst1.key_range.as_ref().unwrap();
let b = sst2.key_range.as_ref().unwrap();
a.compare(b)
},
);
l0.sub_levels[idx - 1].total_file_size = l0.sub_levels
[idx - 1]
.table_infos
.iter()
.map(|sst| sst.file_size)
.sum::<u64>();
l0.sub_levels.remove(idx);
} else {
idx += 1;
Expand Down Expand Up @@ -518,26 +532,33 @@ impl DynamicLevelSelector {
compaction_group.compaction_config.as_ref(),
)
});
if table_id == 0 {
for sst in &vlevel.table_infos {
group.member_table_ids.extend(sst.table_ids.clone());
}
} else {
group.member_table_ids.push(table_id);
}
group.l0.as_mut().unwrap().sub_levels.push(vlevel);
}
}

for level in &levels.levels {
let mut virtual_level: HashMap<u32, Level> = HashMap::default();
for sst in &level.table_infos {
let table_id =
if sst.table_ids.len() > 1 || hybrid_table_ids.contains(&sst.table_ids[0]) {
if sst.table_ids.len() > 1 {
for table_id in &sst.table_ids {
if virtual_group.contains_key(table_id) {
return None;
}
}
let table_id = if sst.table_ids.len() > 1 {
for table_id in &sst.table_ids {
if virtual_group.contains_key(table_id) {
return None;
}
0
} else {
sst.table_ids[0]
};
hybrid_table_ids.insert(*table_id);
}
0
} else if hybrid_table_ids.contains(&sst.table_ids[0]) {
0
} else {
sst.table_ids[0]
};
let new_level = virtual_level.entry(table_id).or_insert_with(|| Level {
level_idx: level.level_idx,
level_type: level.level_type,
Expand All @@ -553,6 +574,13 @@ impl DynamicLevelSelector {
}
for (table_id, vlevel) in virtual_level {
if let Some(group) = virtual_group.get_mut(&table_id) {
if table_id == 0 {
for sst in &vlevel.table_infos {
group.member_table_ids.extend(sst.table_ids.clone());
}
} else {
group.member_table_ids.push(table_id);
}
group.levels[(level.level_idx as usize) - 1] = vlevel;
}
}
Expand All @@ -568,7 +596,11 @@ impl DynamicLevelSelector {
compaction_group.compaction_config.clone(),
));
let mut score_levels = vec![];
for (table_id, group) in &virtual_group {
for (table_id, group) in &mut virtual_group {
group.member_table_ids.sort();
group.member_table_ids.dedup();
assert!(!group.member_table_ids.is_empty());
selector_stats.record_virtual_group_info(group.member_table_ids.clone());
let ctx = dynamic_level_core.get_priority_levels(group, level_handlers);
for picker_info in ctx.score_levels {
score_levels.push((*table_id, picker_info, ctx.base_level));
Expand All @@ -593,7 +625,11 @@ impl DynamicLevelSelector {
let mut stats = LocalPickerStatistic::default();
if let Some(ret) = picker.pick_compaction(&group, level_handlers, &mut stats) {
ret.add_pending_task(task_id, level_handlers);
tracing::warn!("NEW COMPACT task-{} to target level-{}", task_id, picker_info.target_level);
selector_stats.record_virtual_group_task(
table_id,
ret.input_levels[0].level_idx as usize,
ret.target_level,
);
return Some(create_compaction_task(
dynamic_level_core.get_config(),
ret,
Expand All @@ -611,6 +647,12 @@ impl DynamicLevelSelector {
}
}

pub fn is_trivial_move_task(task: &CompactionInput) -> bool {
task.input_levels.len() == 2
&& task.input_levels[1].level_idx as usize == task.target_level
&& task.input_levels[1].table_infos.is_empty()
}

impl CompactionSelector for DynamicLevelSelector {
fn pick_compaction(
&mut self,
Expand All @@ -622,19 +664,6 @@ impl CompactionSelector for DynamicLevelSelector {
_table_id_to_options: HashMap<u32, TableOption>,
developer_config: Arc<CompactionDeveloperConfig>,
) -> Option<CompactionTask> {
if levels.member_table_ids.len() > 1 {
if let Some(ret) = self.pick_compaction_per_table(
task_id,
compaction_group,
levels,
level_handlers,
selector_stats,
developer_config.clone(),
) {
return Some(ret);
}
}

let dynamic_level_core = DynamicLevelSelectorCore::new(
compaction_group.compaction_config.clone(),
developer_config.clone(),
Expand All @@ -646,6 +675,39 @@ impl CompactionSelector for DynamicLevelSelector {
let compaction_task_validator = Arc::new(CompactionTaskValidator::new(
compaction_group.compaction_config.clone(),
));
if levels.member_table_ids.len() > 1 && !ctx.score_levels.is_empty() {
if ctx.score_levels[0].score > SCORE_BASE {
let mut picker = dynamic_level_core.create_compaction_picker(
&ctx.score_levels[0],
overlap_strategy.clone(),
compaction_task_validator.clone(),
);
let mut stats = LocalPickerStatistic::default();
if let Some(ret) = picker.pick_compaction(levels, level_handlers, &mut stats)
&& (ctx.score_levels[0].picker_type == PickerType::Tier
|| is_trivial_move_task(&ret))
{
ret.add_pending_task(task_id, level_handlers);
return Some(create_compaction_task(
dynamic_level_core.get_config(),
ret,
ctx.base_level,
self.task_type(),
));
}
}

if let Some(ret) = self.pick_compaction_per_table(
task_id,
compaction_group,
levels,
level_handlers,
selector_stats,
developer_config.clone(),
) {
return Some(ret);
}
}
for picker_info in &ctx.score_levels {
if picker_info.score <= SCORE_BASE {
return None;
Expand Down
79 changes: 79 additions & 0 deletions src/meta/src/hummock/compaction/selector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod space_reclaim_selector;
mod tombstone_compaction_selector;
mod ttl_selector;

use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -70,13 +71,91 @@ pub fn default_compaction_selector() -> Box<dyn CompactionSelector> {
Box::<DynamicLevelSelector>::default()
}

struct TableVirtualGroup {
table_id: u32,
select_level: usize,
target_level: usize,
}

#[derive(Default)]
struct LocalVirtualGroupInfo {
groups: Vec<Vec<u32>>,
count: usize,
}
#[derive(Default)]
struct LocalVirtualGroupStatistic {
groups: Vec<LocalVirtualGroupInfo>,
}

impl LocalVirtualGroupStatistic {
pub fn may_log_info(&mut self, mut groups: Vec<Vec<u32>>) {
for group in groups.iter_mut() {
group.sort();
}
groups.sort_by_key(|group| *group.first().unwrap());
for group in &mut self.groups {
if group.groups == groups {
group.count += 1;
if group.count >= 100 {
tracing::info!("group state table {:?}", group.groups);
group.count = 0;
}
return;
}
}
self.groups.push(LocalVirtualGroupInfo {
groups,
count: 1,
});
}
}

std::thread_local!(static LOG_COUNTER: RefCell<LocalVirtualGroupStatistic> = RefCell::new(LocalVirtualGroupStatistic::default()));

#[derive(Default)]
pub struct LocalSelectorStatistic {
skip_picker: Vec<(usize, usize, LocalPickerStatistic)>,
table_virtual_group: Option<TableVirtualGroup>,
virtual_group_score_statistic: Vec<Vec<u32>>,
}

impl LocalSelectorStatistic {
pub fn record_virtual_group_info(&mut self, table_ids: Vec<u32>) {
self.virtual_group_score_statistic.push(table_ids);
}

pub fn record_virtual_group_task(
&mut self,
table_id: u32,
select_level: usize,
target_level: usize,
) {
self.table_virtual_group = Some(TableVirtualGroup {
table_id,
select_level,
target_level,
});
}

pub fn report_to_metrics(&self, group_id: u64, metrics: &MetaMetrics) {
if self.virtual_group_score_statistic.is_empty() {
let x = self.virtual_group_score_statistic.clone();
LOG_COUNTER.with_borrow_mut(|info| {
info.may_log_info(x);
});
}
if let Some(group) = self.table_virtual_group.as_ref() {
let level_label = format!("{}-to-{}", group.select_level, group.target_level);
metrics
.compact_frequency
.with_label_values(&[
"Virtual",
&group_id.to_string(),
&group.table_id.to_string(),
level_label.as_str(),
])
.inc();
}
for (start_level, target_level, stats) in &self.skip_picker {
let level_label = format!("cg{}-{}-to-{}", group_id, start_level, target_level);
if stats.skip_by_write_amp_limit > 0 {
Expand Down

0 comments on commit aa9b120

Please sign in to comment.