Skip to content

Commit

Permalink
compact whole level
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed May 29, 2024
1 parent ce4130a commit f8df657
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,72 @@ impl LevelCompactionPicker {

return Some(result);
}

if l0
.sub_levels
.iter()
.filter(|level| level.level_type() == LevelType::Nonoverlapping)
.count()
>= self.config.level0_sub_level_compact_level_count as usize * 2
{
let mut select_level_inputs = vec![];
let mut info = overlap_strategy.create_overlap_info();
let max_compaction_bytes = std::cmp::max(
target_level.total_file_size,
self.config.max_compaction_bytes,
);

let mut total_file_size = 0;
let mut total_file_count = 0;
for level in &l0.sub_levels {
if level_handlers[0].is_level_pending_compact(level)
|| total_file_size > max_compaction_bytes
{
break;
}
for sst in &level.table_infos {
info.update(sst);
}
select_level_inputs.push(InputLevel {
level_idx: target_level.level_idx,
level_type: target_level.level_type,
table_infos: level.table_infos.clone(),
});
total_file_size += level.total_file_size;
total_file_count += level.table_infos.len();
}
let range = info.check_multiple_overlap(&target_level.table_infos);
let mut target_level_size = 0;
let mut pending_compact = false;
for sst in &target_level.table_infos[range.clone()] {
if level_handlers[target_level.level_idx as usize].is_pending_compact(&sst.sst_id) {
pending_compact = true;
break;
}

target_level_size += sst.file_size;
}
if !pending_compact && target_level_size < total_file_size {
select_level_inputs.reverse();
total_file_count += range.len();
select_level_inputs.push(InputLevel {
level_idx: target_level.level_idx,
level_type: target_level.level_type,
table_infos: target_level.table_infos[range].to_vec(),
});
let result = CompactionInput {
input_levels: select_level_inputs,
target_level: self.target_level,
select_input_size: total_file_size,
target_input_size: target_level_size,
total_file_count: total_file_count as u64,
vnode_partition_count,
..Default::default()
};
return Some(result);
}
}

None
}
}
Expand Down
213 changes: 3 additions & 210 deletions src/meta/src/hummock/compaction/selector/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;

use risingwave_common::catalog::TableOption;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
build_initial_compaction_group_levels, HummockLevelsExt,
};
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt;
use risingwave_hummock_sdk::HummockCompactionTaskId;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{compact_task, CompactionConfig, Level, LevelType};
use risingwave_pb::hummock::{compact_task, CompactionConfig, LevelType};

use super::{
create_compaction_task, CompactionSelector, LevelCompactionPicker, TierCompactionPicker,
Expand Down Expand Up @@ -420,197 +417,6 @@ impl DynamicLevelSelectorCore {
}
}

impl DynamicLevelSelector {
fn pick_compaction_per_table(
&mut self,
task_id: HummockCompactionTaskId,
compaction_group: &CompactionGroup,
levels: &Levels,
level_handlers: &mut [LevelHandler],
selector_stats: &mut LocalSelectorStatistic,
developer_config: Arc<CompactionDeveloperConfig>,
) -> Option<CompactionTask> {
let mut virtual_group: HashMap<u32, Levels> = HashMap::default();
let mut hybrid_table_ids: HashSet<u32> = HashSet::default();
for level in &levels.l0.as_ref().unwrap().sub_levels {
let mut virtual_level: HashMap<u32, Level> = HashMap::default();
for sst in &level.table_infos {
let table_id = if level.level_type() == LevelType::Overlapping
|| sst.table_ids.len() > 1
|| hybrid_table_ids.contains(&sst.table_ids[0])
{
// 0 represent hybrid sst.
if sst.table_ids.len() > 1 && level.level_type() != LevelType::Overlapping {
for table_id in &sst.table_ids {
if !hybrid_table_ids.contains(table_id) {
hybrid_table_ids.insert(*table_id);
if let Some(vlevel) = virtual_level.remove(table_id) {
let new_level =
virtual_level.entry(0).or_insert_with(|| Level {
level_idx: level.level_idx,
level_type: level.level_type,
table_infos: vec![],
total_file_size: 0,
sub_level_id: level.sub_level_id,
uncompressed_file_size: 0,
vnode_partition_count: 0,
});
new_level.table_infos.extend(vlevel.table_infos);
new_level.total_file_size += vlevel.total_file_size;
}
if let Some(group) = virtual_group.remove(table_id) {
let hybrid_group =
virtual_group.entry(0).or_insert_with(|| {
build_initial_compaction_group_levels(
levels.group_id,
compaction_group.compaction_config.as_ref(),
)
});
let l0 = hybrid_group.l0.as_mut().unwrap();
l0.sub_levels.extend(group.l0.unwrap().sub_levels);
l0.sub_levels.sort_by_key(|l| l.sub_level_id);
let mut idx = 1;
while idx < l0.sub_levels.len() {
if l0.sub_levels[idx].sub_level_id
== l0.sub_levels[idx - 1].sub_level_id
{
let x = std::mem::take(&mut l0.sub_levels[idx].table_infos);
l0.sub_levels[idx - 1].table_infos.extend(x);
l0.sub_levels[idx - 1].total_file_size +=
l0.sub_levels[idx].total_file_size;
l0.sub_levels[idx - 1].table_infos.sort_by(
|sst1, sst2| {
let a = sst1.key_range.as_ref().unwrap();
let b = sst2.key_range.as_ref().unwrap();
a.compare(b)
},
);
l0.sub_levels.remove(idx);
} else {
idx += 1;
}
}
}
}
}
}
0
} else {
sst.table_ids[0]
};
let new_level = virtual_level.entry(table_id).or_insert_with(|| Level {
level_idx: level.level_idx,
level_type: level.level_type,
table_infos: vec![],
total_file_size: 0,
sub_level_id: level.sub_level_id,
uncompressed_file_size: 0,
vnode_partition_count: 0,
});
new_level.table_infos.push(sst.clone());
new_level.total_file_size += sst.file_size;
new_level.uncompressed_file_size += sst.uncompressed_file_size;
}
for (table_id, vlevel) in virtual_level {
let group = virtual_group.entry(table_id).or_insert_with(|| {
build_initial_compaction_group_levels(
levels.group_id,
compaction_group.compaction_config.as_ref(),
)
});
group.l0.as_mut().unwrap().sub_levels.push(vlevel);
}
}

for level in &levels.levels {
let mut virtual_level: HashMap<u32, Level> = HashMap::default();
for sst in &level.table_infos {
let table_id =
if sst.table_ids.len() > 1 || hybrid_table_ids.contains(&sst.table_ids[0]) {
if sst.table_ids.len() > 1 {
for table_id in &sst.table_ids {
if virtual_group.contains_key(table_id) {
return None;
}
}
}
0
} else {
sst.table_ids[0]
};
let new_level = virtual_level.entry(table_id).or_insert_with(|| Level {
level_idx: level.level_idx,
level_type: level.level_type,
table_infos: vec![],
total_file_size: 0,
sub_level_id: 0,
uncompressed_file_size: 0,
vnode_partition_count: 0,
});
new_level.table_infos.push(sst.clone());
new_level.total_file_size += sst.file_size;
new_level.uncompressed_file_size += sst.uncompressed_file_size;
}
for (table_id, vlevel) in virtual_level {
if let Some(group) = virtual_group.get_mut(&table_id) {
group.levels[(level.level_idx as usize) - 1] = vlevel;
}
}
}
let dynamic_level_core = DynamicLevelSelectorCore::new(
compaction_group.compaction_config.clone(),
developer_config.clone(),
);
let overlap_strategy =
create_overlap_strategy(compaction_group.compaction_config.compaction_mode());
// TODO: Determine which rule to enable by write limit
let compaction_task_validator = Arc::new(CompactionTaskValidator::new(
compaction_group.compaction_config.clone(),
));
let mut score_levels = vec![];
for (table_id, group) in &virtual_group {
let ctx = dynamic_level_core.get_priority_levels(group, level_handlers);
for picker_info in ctx.score_levels {
score_levels.push((*table_id, picker_info, ctx.base_level));
}
}
score_levels.sort_by(|(_, a, _), (_, b, _)| {
b.score
.cmp(&a.score)
.then_with(|| a.target_level.cmp(&b.target_level))
});
for (table_id, picker_info, base_level) in score_levels {
if picker_info.score <= SCORE_BASE {
continue;
}
let group = virtual_group.get(&table_id).unwrap();
let mut picker = dynamic_level_core.create_compaction_picker(
&picker_info,
overlap_strategy.clone(),
compaction_task_validator.clone(),
);

let mut stats = LocalPickerStatistic::default();
if let Some(ret) = picker.pick_compaction(&group, level_handlers, &mut stats) {
ret.add_pending_task(task_id, level_handlers);
tracing::warn!("NEW COMPACT task-{} to target level-{}", task_id, picker_info.target_level);
return Some(create_compaction_task(
dynamic_level_core.get_config(),
ret,
base_level,
self.task_type(),
));
}
selector_stats.skip_picker.push((
picker_info.select_level,
picker_info.target_level,
stats,
));
}
None
}
}

impl CompactionSelector for DynamicLevelSelector {
fn pick_compaction(
&mut self,
Expand All @@ -622,19 +428,6 @@ impl CompactionSelector for DynamicLevelSelector {
_table_id_to_options: HashMap<u32, TableOption>,
developer_config: Arc<CompactionDeveloperConfig>,
) -> Option<CompactionTask> {
if levels.member_table_ids.len() > 1 {
if let Some(ret) = self.pick_compaction_per_table(
task_id,
compaction_group,
levels,
level_handlers,
selector_stats,
developer_config.clone(),
) {
return Some(ret);
}
}

let dynamic_level_core = DynamicLevelSelectorCore::new(
compaction_group.compaction_config.clone(),
developer_config.clone(),
Expand Down

0 comments on commit f8df657

Please sign in to comment.