Skip to content

Commit

Permalink
partition virtual lsm tree
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed May 29, 2024
1 parent 30f251f commit ce4130a
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule {
max_level_size * self.config.level0_sub_level_compact_level_count as u64 / 2
>= input.select_input_size;

if is_write_amp_large || max_level_size > self.config.sub_level_max_compaction_bytes {
if is_write_amp_large {
stats.skip_by_write_amp_limit += 1;
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ impl IntraCompactionPicker {
}

for (idx, level) in l0.sub_levels.iter().enumerate() {
if level.level_type() != LevelType::Nonoverlapping {
if level.level_type() != LevelType::Nonoverlapping
|| level.total_file_size > self.config.sub_level_max_compaction_bytes
{
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ impl NonOverlapSubLevelPicker {
sstable_infos: vec![vec![]; levels.len()],
..Default::default()
};
let first_table_id = if sst.table_ids.len() == 1 {
sst.table_ids[0]
} else {
0
};

let mut pick_levels_range = Vec::default();
let mut max_select_level_count = 0;
Expand Down Expand Up @@ -307,6 +312,12 @@ impl NonOverlapSubLevelPicker {
if level_handler.is_pending_compact(&other.sst_id) {
break 'expand_new_level;
}
if first_table_id != 0
&& (other.table_ids.len() > 0
|| (other.table_ids.len() == 1 && other.table_ids[0] != first_table_id))
{
break 'expand_new_level;
}
basic_overlap_info.update(other);

add_files_size += other.get_file_size();
Expand Down
218 changes: 213 additions & 5 deletions src/meta/src/hummock/compaction/selector/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use risingwave_common::catalog::TableOption;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
build_initial_compaction_group_levels, HummockLevelsExt,
};
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_hummock_sdk::HummockCompactionTaskId;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{compact_task, CompactionConfig, LevelType};
use risingwave_pb::hummock::{compact_task, CompactionConfig, Level, LevelType};

use super::{
create_compaction_task, CompactionSelector, LevelCompactionPicker, TierCompactionPicker,
Expand All @@ -41,7 +44,7 @@ use crate::hummock::model::CompactionGroup;

pub const SCORE_BASE: u64 = 100;

#[derive(Debug, Default, Clone)]
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub enum PickerType {
Tier,
Intra,
Expand Down Expand Up @@ -417,6 +420,197 @@ impl DynamicLevelSelectorCore {
}
}

impl DynamicLevelSelector {
fn pick_compaction_per_table(
&mut self,
task_id: HummockCompactionTaskId,
compaction_group: &CompactionGroup,
levels: &Levels,
level_handlers: &mut [LevelHandler],
selector_stats: &mut LocalSelectorStatistic,
developer_config: Arc<CompactionDeveloperConfig>,
) -> Option<CompactionTask> {
let mut virtual_group: HashMap<u32, Levels> = HashMap::default();
let mut hybrid_table_ids: HashSet<u32> = HashSet::default();
for level in &levels.l0.as_ref().unwrap().sub_levels {
let mut virtual_level: HashMap<u32, Level> = HashMap::default();
for sst in &level.table_infos {
let table_id = if level.level_type() == LevelType::Overlapping
|| sst.table_ids.len() > 1
|| hybrid_table_ids.contains(&sst.table_ids[0])
{
// 0 represent hybrid sst.
if sst.table_ids.len() > 1 && level.level_type() != LevelType::Overlapping {
for table_id in &sst.table_ids {
if !hybrid_table_ids.contains(table_id) {
hybrid_table_ids.insert(*table_id);
if let Some(vlevel) = virtual_level.remove(table_id) {
let new_level =
virtual_level.entry(0).or_insert_with(|| Level {
level_idx: level.level_idx,
level_type: level.level_type,
table_infos: vec![],
total_file_size: 0,
sub_level_id: level.sub_level_id,
uncompressed_file_size: 0,
vnode_partition_count: 0,
});
new_level.table_infos.extend(vlevel.table_infos);
new_level.total_file_size += vlevel.total_file_size;
}
if let Some(group) = virtual_group.remove(table_id) {
let hybrid_group =
virtual_group.entry(0).or_insert_with(|| {
build_initial_compaction_group_levels(
levels.group_id,
compaction_group.compaction_config.as_ref(),
)
});
let l0 = hybrid_group.l0.as_mut().unwrap();
l0.sub_levels.extend(group.l0.unwrap().sub_levels);
l0.sub_levels.sort_by_key(|l| l.sub_level_id);
let mut idx = 1;
while idx < l0.sub_levels.len() {
if l0.sub_levels[idx].sub_level_id
== l0.sub_levels[idx - 1].sub_level_id
{
let x = std::mem::take(&mut l0.sub_levels[idx].table_infos);
l0.sub_levels[idx - 1].table_infos.extend(x);
l0.sub_levels[idx - 1].total_file_size +=
l0.sub_levels[idx].total_file_size;
l0.sub_levels[idx - 1].table_infos.sort_by(
|sst1, sst2| {
let a = sst1.key_range.as_ref().unwrap();
let b = sst2.key_range.as_ref().unwrap();
a.compare(b)
},
);
l0.sub_levels.remove(idx);
} else {
idx += 1;
}
}
}
}
}
}
0
} else {
sst.table_ids[0]
};
let new_level = virtual_level.entry(table_id).or_insert_with(|| Level {
level_idx: level.level_idx,
level_type: level.level_type,
table_infos: vec![],
total_file_size: 0,
sub_level_id: level.sub_level_id,
uncompressed_file_size: 0,
vnode_partition_count: 0,
});
new_level.table_infos.push(sst.clone());
new_level.total_file_size += sst.file_size;
new_level.uncompressed_file_size += sst.uncompressed_file_size;
}
for (table_id, vlevel) in virtual_level {
let group = virtual_group.entry(table_id).or_insert_with(|| {
build_initial_compaction_group_levels(
levels.group_id,
compaction_group.compaction_config.as_ref(),
)
});
group.l0.as_mut().unwrap().sub_levels.push(vlevel);
}
}

for level in &levels.levels {
let mut virtual_level: HashMap<u32, Level> = HashMap::default();
for sst in &level.table_infos {
let table_id =
if sst.table_ids.len() > 1 || hybrid_table_ids.contains(&sst.table_ids[0]) {
if sst.table_ids.len() > 1 {
for table_id in &sst.table_ids {
if virtual_group.contains_key(table_id) {
return None;
}
}
}
0
} else {
sst.table_ids[0]
};
let new_level = virtual_level.entry(table_id).or_insert_with(|| Level {
level_idx: level.level_idx,
level_type: level.level_type,
table_infos: vec![],
total_file_size: 0,
sub_level_id: 0,
uncompressed_file_size: 0,
vnode_partition_count: 0,
});
new_level.table_infos.push(sst.clone());
new_level.total_file_size += sst.file_size;
new_level.uncompressed_file_size += sst.uncompressed_file_size;
}
for (table_id, vlevel) in virtual_level {
if let Some(group) = virtual_group.get_mut(&table_id) {
group.levels[(level.level_idx as usize) - 1] = vlevel;
}
}
}
let dynamic_level_core = DynamicLevelSelectorCore::new(
compaction_group.compaction_config.clone(),
developer_config.clone(),
);
let overlap_strategy =
create_overlap_strategy(compaction_group.compaction_config.compaction_mode());
// TODO: Determine which rule to enable by write limit
let compaction_task_validator = Arc::new(CompactionTaskValidator::new(
compaction_group.compaction_config.clone(),
));
let mut score_levels = vec![];
for (table_id, group) in &virtual_group {
let ctx = dynamic_level_core.get_priority_levels(group, level_handlers);
for picker_info in ctx.score_levels {
score_levels.push((*table_id, picker_info, ctx.base_level));
}
}
score_levels.sort_by(|(_, a, _), (_, b, _)| {
b.score
.cmp(&a.score)
.then_with(|| a.target_level.cmp(&b.target_level))
});
for (table_id, picker_info, base_level) in score_levels {
if picker_info.score <= SCORE_BASE {
continue;
}
let group = virtual_group.get(&table_id).unwrap();
let mut picker = dynamic_level_core.create_compaction_picker(
&picker_info,
overlap_strategy.clone(),
compaction_task_validator.clone(),
);

let mut stats = LocalPickerStatistic::default();
if let Some(ret) = picker.pick_compaction(&group, level_handlers, &mut stats) {
ret.add_pending_task(task_id, level_handlers);
tracing::warn!("NEW COMPACT task-{} to target level-{}", task_id, picker_info.target_level);
return Some(create_compaction_task(
dynamic_level_core.get_config(),
ret,
base_level,
self.task_type(),
));
}
selector_stats.skip_picker.push((
picker_info.select_level,
picker_info.target_level,
stats,
));
}
None
}
}

impl CompactionSelector for DynamicLevelSelector {
fn pick_compaction(
&mut self,
Expand All @@ -428,9 +622,22 @@ impl CompactionSelector for DynamicLevelSelector {
_table_id_to_options: HashMap<u32, TableOption>,
developer_config: Arc<CompactionDeveloperConfig>,
) -> Option<CompactionTask> {
if levels.member_table_ids.len() > 1 {
if let Some(ret) = self.pick_compaction_per_table(
task_id,
compaction_group,
levels,
level_handlers,
selector_stats,
developer_config.clone(),
) {
return Some(ret);
}
}

let dynamic_level_core = DynamicLevelSelectorCore::new(
compaction_group.compaction_config.clone(),
developer_config,
developer_config.clone(),
);
let overlap_strategy =
create_overlap_strategy(compaction_group.compaction_config.compaction_mode());
Expand Down Expand Up @@ -459,6 +666,7 @@ impl CompactionSelector for DynamicLevelSelector {
self.task_type(),
));
}

selector_stats.skip_picker.push((
picker_info.select_level,
picker_info.target_level,
Expand Down

0 comments on commit ce4130a

Please sign in to comment.