Skip to content

Commit

Permalink
feat(compaction): compact partitioned level for EmergencyCompactionPi…
Browse files Browse the repository at this point in the history
…cker (#14994)

Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored Feb 5, 2024
1 parent b3b3558 commit 5528bbf
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
use std::sync::Arc;

use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::CompactionConfig;
use risingwave_pb::hummock::{CompactionConfig, LevelType};

use super::{
CompactionInput, CompactionPicker, CompactionTaskValidator, LevelCompactionPicker,
LocalPickerStatistic, TierCompactionPicker,
};
use crate::hummock::compaction::picker::intra_compaction_picker::WholeLevelCompactionPicker;
use crate::hummock::compaction::CompactionDeveloperConfig;
use crate::hummock::level_handler::LevelHandler;

Expand Down Expand Up @@ -50,20 +51,65 @@ impl EmergencyCompactionPicker {
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
let unused_validator = Arc::new(CompactionTaskValidator::unused());

let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator(
self.target_level,
self.config.clone(),
unused_validator.clone(),
self.developer_config.clone(),
);

if let Some(ret) =
base_level_compaction_picker.pick_compaction(levels, level_handlers, stats)
let l0 = levels.l0.as_ref().unwrap();
let overlapping_count = l0
.sub_levels
.iter()
.filter(|level| level.level_type == LevelType::Overlapping as i32)
.count();
let no_overlap_count = l0
.sub_levels
.iter()
.filter(|level| {
level.level_type == LevelType::Nonoverlapping as i32
&& level.vnode_partition_count == 0
})
.count();
let partitioned_count = l0
.sub_levels
.iter()
.filter(|level| {
level.level_type == LevelType::Nonoverlapping as i32
&& level.vnode_partition_count > 0
})
.count();
// We trigger `EmergencyCompactionPicker` only when some unexpected condition cause the number of l0 levels increase and the origin strategy
// can not compact those data to lower level. But if most of these levels are overlapping level, it is dangerous to compact small data of non-overlapping sub level
// to base level, it will cost a lot of compactor resource because of large write-amplification.
if (self.config.split_weight_by_vnode == 0 && no_overlap_count > overlapping_count)
|| (self.config.split_weight_by_vnode > 0
&& partitioned_count > no_overlap_count
&& partitioned_count > overlapping_count)
{
return Some(ret);
let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator(
self.target_level,
self.config.clone(),
unused_validator.clone(),
self.developer_config.clone(),
);

if let Some(ret) =
base_level_compaction_picker.pick_compaction(levels, level_handlers, stats)
{
return Some(ret);
}
}
if self.config.split_weight_by_vnode > 0
&& no_overlap_count > partitioned_count
&& no_overlap_count > overlapping_count
{
let intral_level_compaction_picker =
WholeLevelCompactionPicker::new(self.config.clone(), unused_validator.clone());

if let Some(ret) = intral_level_compaction_picker.pick_whole_level(
levels.l0.as_ref().unwrap(),
&level_handlers[0],
self.config.split_weight_by_vnode,
stats,
) {
return Some(ret);
}
}
let mut tier_compaction_picker =
TierCompactionPicker::new_with_validator(self.config.clone(), unused_validator);

Expand Down
175 changes: 103 additions & 72 deletions src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,78 +108,11 @@ impl IntraCompactionPicker {
partition_count: u32,
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
if partition_count == 0 {
return None;
}
for (idx, level) in l0.sub_levels.iter().enumerate() {
if level.level_type() != LevelType::Nonoverlapping
|| level.vnode_partition_count == partition_count
{
continue;
}

let max_compaction_bytes = std::cmp::max(
self.config.max_bytes_for_level_base,
self.config.sub_level_max_compaction_bytes
* (self.config.level0_sub_level_compact_level_count as u64),
);

let mut select_input_size = 0;

let mut select_level_inputs = vec![];
let mut total_file_count = 0;
let mut wait_enough = false;
for next_level in l0.sub_levels.iter().skip(idx) {
if select_input_size > max_compaction_bytes
|| total_file_count > self.config.level0_max_compact_file_number
|| (next_level.vnode_partition_count == partition_count
&& select_level_inputs.len() > 1)
{
wait_enough = true;
break;
}

if level_handler.is_level_pending_compact(next_level) {
break;
}

select_input_size += next_level.total_file_size;
total_file_count += next_level.table_infos.len() as u64;

select_level_inputs.push(InputLevel {
level_idx: 0,
level_type: next_level.level_type,
table_infos: next_level.table_infos.clone(),
});
}
if !select_level_inputs.is_empty() {
let vnode_partition_count =
if select_input_size > self.config.sub_level_max_compaction_bytes / 2 {
partition_count
} else {
0
};
let result = CompactionInput {
input_levels: select_level_inputs,
target_sub_level_id: level.sub_level_id,
select_input_size,
total_file_count,
vnode_partition_count,
..Default::default()
};
if wait_enough
|| self.compaction_task_validator.valid_compact_task(
&result,
ValidationRuleType::Intra,
stats,
)
{
return Some(result);
}
}
}

None
let picker = WholeLevelCompactionPicker::new(
self.config.clone(),
self.compaction_task_validator.clone(),
);
picker.pick_whole_level(l0, level_handler, partition_count, stats)
}

fn pick_l0_intra(
Expand Down Expand Up @@ -378,6 +311,104 @@ impl IntraCompactionPicker {
}
}

pub struct WholeLevelCompactionPicker {
config: Arc<CompactionConfig>,
compaction_task_validator: Arc<CompactionTaskValidator>,
}

impl WholeLevelCompactionPicker {
pub fn new(
config: Arc<CompactionConfig>,
compaction_task_validator: Arc<CompactionTaskValidator>,
) -> Self {
Self {
config,
compaction_task_validator,
}
}

pub fn pick_whole_level(
&self,
l0: &OverlappingLevel,
level_handler: &LevelHandler,
partition_count: u32,
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
if partition_count == 0 {
return None;
}
for (idx, level) in l0.sub_levels.iter().enumerate() {
if level.level_type() != LevelType::Nonoverlapping
|| level.vnode_partition_count == partition_count
{
continue;
}

let max_compaction_bytes = std::cmp::max(
self.config.max_bytes_for_level_base,
self.config.sub_level_max_compaction_bytes
* (self.config.level0_sub_level_compact_level_count as u64),
);

let mut select_input_size = 0;

let mut select_level_inputs = vec![];
let mut total_file_count = 0;
let mut wait_enough = false;
for next_level in l0.sub_levels.iter().skip(idx) {
if select_input_size > max_compaction_bytes
|| total_file_count > self.config.level0_max_compact_file_number
|| (next_level.vnode_partition_count == partition_count
&& select_level_inputs.len() > 1)
{
wait_enough = true;
break;
}

if level_handler.is_level_pending_compact(next_level) {
break;
}

select_input_size += next_level.total_file_size;
total_file_count += next_level.table_infos.len() as u64;

select_level_inputs.push(InputLevel {
level_idx: 0,
level_type: next_level.level_type,
table_infos: next_level.table_infos.clone(),
});
}
if !select_level_inputs.is_empty() {
let vnode_partition_count =
if select_input_size > self.config.sub_level_max_compaction_bytes / 2 {
partition_count
} else {
0
};
let result = CompactionInput {
input_levels: select_level_inputs,
target_sub_level_id: level.sub_level_id,
select_input_size,
total_file_count,
vnode_partition_count,
..Default::default()
};
if wait_enough
|| self.compaction_task_validator.valid_compact_task(
&result,
ValidationRuleType::Intra,
stats,
)
{
return Some(result);
}
}
}

None
}
}

#[cfg(test)]
pub mod tests {
use risingwave_pb::hummock::Level;
Expand Down

0 comments on commit 5528bbf

Please sign in to comment.