Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compaction): compact partitioned level for EmergencyCompactionPicker #14994

Merged
merged 3 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
use std::sync::Arc;

use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::CompactionConfig;
use risingwave_pb::hummock::{CompactionConfig, LevelType};

use super::{
CompactionInput, CompactionPicker, CompactionTaskValidator, LevelCompactionPicker,
LocalPickerStatistic, TierCompactionPicker,
};
use crate::hummock::compaction::picker::intra_compaction_picker::WholeLevelCompactionPicker;
use crate::hummock::compaction::CompactionDeveloperConfig;
use crate::hummock::level_handler::LevelHandler;

Expand Down Expand Up @@ -50,20 +51,65 @@ impl EmergencyCompactionPicker {
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
let unused_validator = Arc::new(CompactionTaskValidator::unused());

let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator(
self.target_level,
self.config.clone(),
unused_validator.clone(),
self.developer_config.clone(),
);

if let Some(ret) =
base_level_compaction_picker.pick_compaction(levels, level_handlers, stats)
let l0 = levels.l0.as_ref().unwrap();
let overlapping_count = l0
.sub_levels
.iter()
.filter(|level| level.level_type == LevelType::Overlapping as i32)
.count();
let no_overlap_count = l0
.sub_levels
.iter()
.filter(|level| {
level.level_type == LevelType::Nonoverlapping as i32
&& level.vnode_partition_count == 0
})
.count();
let partitioned_count = l0
.sub_levels
.iter()
.filter(|level| {
level.level_type == LevelType::Nonoverlapping as i32
&& level.vnode_partition_count > 0
})
.count();
// We trigger `EmergencyCompactionPicker` only when some unexpected condition cause the number of l0 levels increase and the origin strategy
// can not compact those data to lower level. But if most of these levels are overlapping level, it is dangerous to compact small data of non-overlapping sub level
// to base level, it will cost a lot of compactor resource because of large write-amplification.
if (self.config.split_weight_by_vnode == 0 && no_overlap_count > overlapping_count)
|| (self.config.split_weight_by_vnode > 0
&& partitioned_count > no_overlap_count
&& partitioned_count > overlapping_count)
{
return Some(ret);
let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator(
self.target_level,
self.config.clone(),
unused_validator.clone(),
self.developer_config.clone(),
);

if let Some(ret) =
base_level_compaction_picker.pick_compaction(levels, level_handlers, stats)
{
return Some(ret);
}
}
if self.config.split_weight_by_vnode > 0
&& no_overlap_count > partitioned_count
&& no_overlap_count > overlapping_count
{
let intral_level_compaction_picker =
WholeLevelCompactionPicker::new(self.config.clone(), unused_validator.clone());

if let Some(ret) = intral_level_compaction_picker.pick_whole_level(
levels.l0.as_ref().unwrap(),
&level_handlers[0],
self.config.split_weight_by_vnode,
stats,
) {
return Some(ret);
}
}
let mut tier_compaction_picker =
TierCompactionPicker::new_with_validator(self.config.clone(), unused_validator);

Expand Down
175 changes: 103 additions & 72 deletions src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,78 +108,11 @@ impl IntraCompactionPicker {
partition_count: u32,
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
if partition_count == 0 {
return None;
}
for (idx, level) in l0.sub_levels.iter().enumerate() {
if level.level_type() != LevelType::Nonoverlapping
|| level.vnode_partition_count == partition_count
{
continue;
}

let max_compaction_bytes = std::cmp::max(
self.config.max_bytes_for_level_base,
self.config.sub_level_max_compaction_bytes
* (self.config.level0_sub_level_compact_level_count as u64),
);

let mut select_input_size = 0;

let mut select_level_inputs = vec![];
let mut total_file_count = 0;
let mut wait_enough = false;
for next_level in l0.sub_levels.iter().skip(idx) {
if select_input_size > max_compaction_bytes
|| total_file_count > self.config.level0_max_compact_file_number
|| (next_level.vnode_partition_count == partition_count
&& select_level_inputs.len() > 1)
{
wait_enough = true;
break;
}

if level_handler.is_level_pending_compact(next_level) {
break;
}

select_input_size += next_level.total_file_size;
total_file_count += next_level.table_infos.len() as u64;

select_level_inputs.push(InputLevel {
level_idx: 0,
level_type: next_level.level_type,
table_infos: next_level.table_infos.clone(),
});
}
if !select_level_inputs.is_empty() {
let vnode_partition_count =
if select_input_size > self.config.sub_level_max_compaction_bytes / 2 {
partition_count
} else {
0
};
let result = CompactionInput {
input_levels: select_level_inputs,
target_sub_level_id: level.sub_level_id,
select_input_size,
total_file_count,
vnode_partition_count,
..Default::default()
};
if wait_enough
|| self.compaction_task_validator.valid_compact_task(
&result,
ValidationRuleType::Intra,
stats,
)
{
return Some(result);
}
}
}

None
let picker = WholeLevelCompactionPicker::new(
self.config.clone(),
self.compaction_task_validator.clone(),
);
picker.pick_whole_level(l0, level_handler, partition_count, stats)
}

fn pick_l0_intra(
Expand Down Expand Up @@ -378,6 +311,104 @@ impl IntraCompactionPicker {
}
}

pub struct WholeLevelCompactionPicker {
config: Arc<CompactionConfig>,
compaction_task_validator: Arc<CompactionTaskValidator>,
}

impl WholeLevelCompactionPicker {
pub fn new(
config: Arc<CompactionConfig>,
compaction_task_validator: Arc<CompactionTaskValidator>,
) -> Self {
Self {
config,
compaction_task_validator,
}
}

pub fn pick_whole_level(
&self,
l0: &OverlappingLevel,
level_handler: &LevelHandler,
partition_count: u32,
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
if partition_count == 0 {
return None;
}
for (idx, level) in l0.sub_levels.iter().enumerate() {
if level.level_type() != LevelType::Nonoverlapping
|| level.vnode_partition_count == partition_count
{
continue;
}

let max_compaction_bytes = std::cmp::max(
self.config.max_bytes_for_level_base,
self.config.sub_level_max_compaction_bytes
* (self.config.level0_sub_level_compact_level_count as u64),
);

let mut select_input_size = 0;

let mut select_level_inputs = vec![];
let mut total_file_count = 0;
let mut wait_enough = false;
for next_level in l0.sub_levels.iter().skip(idx) {
if select_input_size > max_compaction_bytes
|| total_file_count > self.config.level0_max_compact_file_number
|| (next_level.vnode_partition_count == partition_count
&& select_level_inputs.len() > 1)
{
wait_enough = true;
break;
}

if level_handler.is_level_pending_compact(next_level) {
break;
}

select_input_size += next_level.total_file_size;
total_file_count += next_level.table_infos.len() as u64;

select_level_inputs.push(InputLevel {
level_idx: 0,
level_type: next_level.level_type,
table_infos: next_level.table_infos.clone(),
});
}
if !select_level_inputs.is_empty() {
let vnode_partition_count =
if select_input_size > self.config.sub_level_max_compaction_bytes / 2 {
partition_count
} else {
0
};
let result = CompactionInput {
input_levels: select_level_inputs,
target_sub_level_id: level.sub_level_id,
select_input_size,
total_file_count,
vnode_partition_count,
..Default::default()
};
if wait_enough
|| self.compaction_task_validator.valid_compact_task(
&result,
ValidationRuleType::Intra,
stats,
)
{
return Some(result);
}
}
}

None
}
}

#[cfg(test)]
pub mod tests {
use risingwave_pb::hummock::Level;
Expand Down
Loading