Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compaction): compact tier level at first when overlapping level too many #14482

Merged
merged 9 commits into from
Feb 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
use std::sync::Arc;

use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::CompactionConfig;
use risingwave_pb::hummock::{CompactionConfig, LevelType};

use super::{
CompactionInput, CompactionPicker, CompactionTaskValidator, LevelCompactionPicker,
LocalPickerStatistic, TierCompactionPicker,
};
use crate::hummock::compaction::picker::intra_compaction_picker::WholeLevelCompactionPicker;
use crate::hummock::compaction::CompactionDeveloperConfig;
use crate::hummock::level_handler::LevelHandler;

Expand Down Expand Up @@ -50,20 +51,65 @@ impl EmergencyCompactionPicker {
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
let unused_validator = Arc::new(CompactionTaskValidator::unused());

let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator(
self.target_level,
self.config.clone(),
unused_validator.clone(),
self.developer_config.clone(),
);

if let Some(ret) =
base_level_compaction_picker.pick_compaction(levels, level_handlers, stats)
let l0 = levels.l0.as_ref().unwrap();
let overlapping_count = l0
.sub_levels
.iter()
.filter(|level| level.level_type == LevelType::Overlapping as i32)
.count();
let no_overlap_count = l0
.sub_levels
.iter()
.filter(|level| {
level.level_type == LevelType::Nonoverlapping as i32
&& level.vnode_partition_count == 0
})
.count();
let partitioned_count = l0
.sub_levels
.iter()
.filter(|level| {
level.level_type == LevelType::Nonoverlapping as i32
&& level.vnode_partition_count > 0
})
.count();
// We trigger `EmergencyCompactionPicker` only when some unexpected condition cause the number of l0 levels increase and the origin strategy
// can not compact those data to lower level. But if most of these levels are overlapping level, it is dangerous to compact small data of non-overlapping sub level
// to base level, it will cost a lot of compactor resource because of large write-amplification.
if (self.config.split_weight_by_vnode == 0 && no_overlap_count > overlapping_count)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good solution, please add some comments for the strategy.

|| (self.config.split_weight_by_vnode > 0
&& partitioned_count > no_overlap_count
&& partitioned_count > overlapping_count)
{
return Some(ret);
let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator(
self.target_level,
self.config.clone(),
unused_validator.clone(),
self.developer_config.clone(),
);

if let Some(ret) =
base_level_compaction_picker.pick_compaction(levels, level_handlers, stats)
{
return Some(ret);
}
}
if self.config.split_weight_by_vnode > 0
Copy link
Contributor

@Li0k Li0k Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always think about whether it is right to unconditionally compact l0 to base. This will lead to base-level accumulation. Should we continue to follow the selection rules of the level selector? What's your opinion?

#14034

&& no_overlap_count > partitioned_count
&& no_overlap_count > overlapping_count
{
let intral_level_compaction_picker =
WholeLevelCompactionPicker::new(self.config.clone(), unused_validator.clone());

if let Some(ret) = intral_level_compaction_picker.pick_whole_level(
levels.l0.as_ref().unwrap(),
&level_handlers[0],
self.config.split_weight_by_vnode,
stats,
) {
return Some(ret);
}
}
let mut tier_compaction_picker =
TierCompactionPicker::new_with_validator(self.config.clone(), unused_validator);

Expand Down
175 changes: 103 additions & 72 deletions src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,78 +108,11 @@ impl IntraCompactionPicker {
partition_count: u32,
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
if partition_count == 0 {
return None;
}
for (idx, level) in l0.sub_levels.iter().enumerate() {
if level.level_type() != LevelType::Nonoverlapping
|| level.vnode_partition_count == partition_count
{
continue;
}

let max_compaction_bytes = std::cmp::max(
self.config.max_bytes_for_level_base,
self.config.sub_level_max_compaction_bytes
* (self.config.level0_sub_level_compact_level_count as u64),
);

let mut select_input_size = 0;

let mut select_level_inputs = vec![];
let mut total_file_count = 0;
let mut wait_enough = false;
for next_level in l0.sub_levels.iter().skip(idx) {
if select_input_size > max_compaction_bytes
|| total_file_count > self.config.level0_max_compact_file_number
|| (next_level.vnode_partition_count == partition_count
&& select_level_inputs.len() > 1)
{
wait_enough = true;
break;
}

if level_handler.is_level_pending_compact(next_level) {
break;
}

select_input_size += next_level.total_file_size;
total_file_count += next_level.table_infos.len() as u64;

select_level_inputs.push(InputLevel {
level_idx: 0,
level_type: next_level.level_type,
table_infos: next_level.table_infos.clone(),
});
}
if !select_level_inputs.is_empty() {
let vnode_partition_count =
if select_input_size > self.config.sub_level_max_compaction_bytes / 2 {
partition_count
} else {
0
};
let result = CompactionInput {
input_levels: select_level_inputs,
target_sub_level_id: level.sub_level_id,
select_input_size,
total_file_count,
vnode_partition_count,
..Default::default()
};
if wait_enough
|| self.compaction_task_validator.valid_compact_task(
&result,
ValidationRuleType::Intra,
stats,
)
{
return Some(result);
}
}
}

None
let picker = WholeLevelCompactionPicker::new(
self.config.clone(),
self.compaction_task_validator.clone(),
);
picker.pick_whole_level(l0, level_handler, partition_count, stats)
}

fn pick_l0_intra(
Expand Down Expand Up @@ -378,6 +311,104 @@ impl IntraCompactionPicker {
}
}

pub struct WholeLevelCompactionPicker {
config: Arc<CompactionConfig>,
compaction_task_validator: Arc<CompactionTaskValidator>,
}

impl WholeLevelCompactionPicker {
pub fn new(
config: Arc<CompactionConfig>,
compaction_task_validator: Arc<CompactionTaskValidator>,
) -> Self {
Self {
config,
compaction_task_validator,
}
}

pub fn pick_whole_level(
&self,
l0: &OverlappingLevel,
level_handler: &LevelHandler,
partition_count: u32,
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
if partition_count == 0 {
return None;
}
for (idx, level) in l0.sub_levels.iter().enumerate() {
if level.level_type() != LevelType::Nonoverlapping
|| level.vnode_partition_count == partition_count
{
continue;
}

let max_compaction_bytes = std::cmp::max(
self.config.max_bytes_for_level_base,
self.config.sub_level_max_compaction_bytes
* (self.config.level0_sub_level_compact_level_count as u64),
);

let mut select_input_size = 0;

let mut select_level_inputs = vec![];
let mut total_file_count = 0;
let mut wait_enough = false;
for next_level in l0.sub_levels.iter().skip(idx) {
if select_input_size > max_compaction_bytes
|| total_file_count > self.config.level0_max_compact_file_number
|| (next_level.vnode_partition_count == partition_count
&& select_level_inputs.len() > 1)
{
wait_enough = true;
break;
}

if level_handler.is_level_pending_compact(next_level) {
break;
}

select_input_size += next_level.total_file_size;
total_file_count += next_level.table_infos.len() as u64;

select_level_inputs.push(InputLevel {
level_idx: 0,
level_type: next_level.level_type,
table_infos: next_level.table_infos.clone(),
});
}
if !select_level_inputs.is_empty() {
let vnode_partition_count =
if select_input_size > self.config.sub_level_max_compaction_bytes / 2 {
partition_count
} else {
0
};
let result = CompactionInput {
input_levels: select_level_inputs,
target_sub_level_id: level.sub_level_id,
select_input_size,
total_file_count,
vnode_partition_count,
..Default::default()
};
if wait_enough
|| self.compaction_task_validator.valid_compact_task(
&result,
ValidationRuleType::Intra,
stats,
)
{
return Some(result);
}
}
}

None
}
}

#[cfg(test)]
pub mod tests {
use risingwave_pb::hummock::Level;
Expand Down
Loading