Skip to content

Commit

Permalink
feat(storage): emergency picker for write_limit (#12183)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Sep 18, 2023
1 parent cc7e506 commit 4dadb7c
Show file tree
Hide file tree
Showing 14 changed files with 198 additions and 63 deletions.
3 changes: 3 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ message CompactTask {
SHARED_BUFFER = 4;
TTL = 5;
TOMBSTONE = 6;
EMERGENCY = 7;
}

// Identifies whether the task is space_reclaim, if the compact_task_type increases, it will be refactored to enum
Expand Down Expand Up @@ -578,6 +579,7 @@ message RiseCtlUpdateCompactionConfigRequest {
uint32 level0_overlapping_sub_level_compact_level_count = 12;
uint64 max_space_reclaim_bytes = 13;
uint64 level0_max_compact_file_number = 14;
bool enable_emergency_picker = 15;
}
}
repeated uint64 compaction_group_ids = 1;
Expand Down Expand Up @@ -709,6 +711,7 @@ message CompactionConfig {
// for tier compaction pick overlapping level
uint32 level0_overlapping_sub_level_compact_level_count = 18;
uint32 tombstone_reclaim_ratio = 19;
bool enable_emergency_picker = 20;
}

message TableStats {
Expand Down
9 changes: 8 additions & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,11 +1202,12 @@ pub mod default {
const DEFAULT_MAX_SUB_COMPACTION: u32 = 4;
const DEFAULT_LEVEL_MULTIPLIER: u64 = 5;
const DEFAULT_MAX_SPACE_RECLAIM_BYTES: u64 = 512 * 1024 * 1024; // 512MB;
const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_SUB_LEVEL_NUMBER: u64 = 1000;
const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_SUB_LEVEL_NUMBER: u64 = 300;
const DEFAULT_MAX_COMPACTION_FILE_COUNT: u64 = 96;
const DEFAULT_MIN_SUB_LEVEL_COMPACT_LEVEL_COUNT: u32 = 3;
const DEFAULT_MIN_OVERLAPPING_SUB_LEVEL_COMPACT_LEVEL_COUNT: u32 = 6;
const DEFAULT_TOMBSTONE_RATIO_PERCENT: u32 = 40;
const DEFAULT_EMERGENCY_PICKER: bool = true;

use crate::catalog::hummock::CompactionFilterFlag;

Expand Down Expand Up @@ -1252,6 +1253,10 @@ pub mod default {
pub fn tombstone_reclaim_ratio() -> u32 {
DEFAULT_TOMBSTONE_RATIO_PERCENT
}

pub fn enable_emergency_picker() -> bool {
DEFAULT_EMERGENCY_PICKER
}
}

pub mod s3_objstore_config {
Expand Down Expand Up @@ -1377,6 +1382,8 @@ pub struct CompactionConfig {
pub level0_max_compact_file_number: u64,
#[serde(default = "default::compaction_config::tombstone_reclaim_ratio")]
pub tombstone_reclaim_ratio: u32,
#[serde(default = "default::compaction_config::enable_emergency_picker")]
pub enable_emergency_picker: bool,
}

#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ level0_tier_compact_file_number = 6
target_file_size_base = 33554432
compaction_filter_mask = 6
max_sub_compaction = 4
level0_stop_write_threshold_sub_level_number = 1000
level0_stop_write_threshold_sub_level_number = 300
level0_sub_level_compact_level_count = 3
level0_overlapping_sub_level_compact_level_count = 6
max_space_reclaim_bytes = 536870912
level0_max_compact_file_number = 96
tombstone_reclaim_ratio = 40
enable_emergency_picker = true

[batch]
enable_barrier_read = false
Expand Down
5 changes: 5 additions & 0 deletions src/ctl/src/cmd_impl/hummock/compaction_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub fn build_compaction_config_vec(
max_space_reclaim_bytes: Option<u64>,
level0_max_compact_file_number: Option<u64>,
level0_overlapping_sub_level_compact_level_count: Option<u32>,
enable_emergency_picker: Option<bool>,
) -> Vec<MutableConfig> {
let mut configs = vec![];
if let Some(c) = max_bytes_for_level_base {
Expand Down Expand Up @@ -101,6 +102,10 @@ pub fn build_compaction_config_vec(
if let Some(c) = level0_overlapping_sub_level_compact_level_count {
configs.push(MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c))
}
if let Some(c) = enable_emergency_picker {
configs.push(MutableConfig::EnableEmergencyPicker(c))
}

configs
}

Expand Down
4 changes: 4 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ enum HummockCommands {
level0_max_compact_file_number: Option<u64>,
#[clap(long)]
level0_overlapping_sub_level_compact_level_count: Option<u32>,
#[clap(long)]
enable_emergency_picker: Option<bool>,
},
/// Split given compaction group into two. Moves the given tables to the new group.
SplitCompactionGroup {
Expand Down Expand Up @@ -549,6 +551,7 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
max_space_reclaim_bytes,
level0_max_compact_file_number,
level0_overlapping_sub_level_compact_level_count,
enable_emergency_picker,
}) => {
cmd_impl::hummock::update_compaction_config(
context,
Expand All @@ -567,6 +570,7 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
max_space_reclaim_bytes,
level0_max_compact_file_number,
level0_overlapping_sub_level_compact_level_count,
enable_emergency_picker,
),
)
.await?
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/compaction/compaction_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl CompactionConfigBuilder {
level0_overlapping_sub_level_compact_level_count:
compaction_config::level0_overlapping_sub_level_compact_level_count(),
tombstone_reclaim_ratio: compaction_config::tombstone_reclaim_ratio(),
enable_emergency_picker: compaction_config::enable_emergency_picker(),
},
}
}
Expand Down
49 changes: 47 additions & 2 deletions src/meta/src/hummock/compaction/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{compact_task, CompactionConfig, LevelType};

use super::picker::{
CompactionTaskValidator, IntraCompactionPicker, SpaceReclaimCompactionPicker,
SpaceReclaimPickerState, TtlPickerState, TtlReclaimCompactionPicker,
CompactionTaskValidator, EmergencyCompactionPicker, IntraCompactionPicker,
SpaceReclaimCompactionPicker, SpaceReclaimPickerState, TtlPickerState,
TtlReclaimCompactionPicker,
};
use super::{
create_compaction_task, LevelCompactionPicker, ManualCompactionOption, ManualCompactionPicker,
Expand Down Expand Up @@ -616,6 +617,50 @@ pub fn default_level_selector() -> Box<dyn LevelSelector> {
Box::<DynamicLevelSelector>::default()
}

#[derive(Default)]
pub struct EmergencySelector {}

impl LevelSelector for EmergencySelector {
fn pick_compaction(
&mut self,
task_id: HummockCompactionTaskId,
group: &CompactionGroup,
levels: &Levels,
level_handlers: &mut [LevelHandler],
selector_stats: &mut LocalSelectorStatistic,
_table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactionTask> {
let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone());
let ctx = dynamic_level_core.calculate_level_base_size(levels);
let picker =
EmergencyCompactionPicker::new(ctx.base_level, group.compaction_config.clone());

let mut stats = LocalPickerStatistic::default();
if let Some(compaction_input) = picker.pick_compaction(levels, level_handlers, &mut stats) {
compaction_input.add_pending_task(task_id, level_handlers);

return Some(create_compaction_task(
group.compaction_config.as_ref(),
compaction_input,
ctx.base_level,
self.task_type(),
));
}

selector_stats.skip_picker.push((0, ctx.base_level, stats));

None
}

fn name(&self) -> &'static str {
"EmergencyCompaction"
}

fn task_type(&self) -> compact_task::TaskType {
compact_task::TaskType::Emergency
}
}

#[cfg(test)]
pub mod tests {
use std::ops::Range;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactTask, CompactionConfig, KeyRange, LevelType};

pub use crate::hummock::compaction::level_selector::{
default_level_selector, DynamicLevelSelector, DynamicLevelSelectorCore, LevelSelector,
ManualCompactionSelector, SpaceReclaimCompactionSelector, TtlCompactionSelector,
default_level_selector, DynamicLevelSelector, DynamicLevelSelectorCore, EmergencySelector,
LevelSelector, ManualCompactionSelector, SpaceReclaimCompactionSelector, TtlCompactionSelector,
};
use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy};
use crate::hummock::compaction::picker::{CompactionInput, LocalPickerStatistic};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,6 @@ impl LevelCompactionPicker {
level_handlers: &[LevelHandler],
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
// TODO: remove this
let l0_size = l0.total_file_size - level_handlers[0].get_pending_file_size();
let base_level_size = target_level.total_file_size
- level_handlers[target_level.level_idx as usize].get_pending_file_size();
if l0_size < base_level_size {
stats.skip_by_write_amp_limit += 1;
return None;
}

// no running base_compaction
let strict_check = level_handlers[0]
.get_pending_tasks()
.iter()
.any(|task| task.target_level != 0);

let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
let min_compaction_bytes = self.config.sub_level_max_compaction_bytes;
let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
Expand Down Expand Up @@ -239,8 +224,7 @@ impl LevelCompactionPicker {
&result,
ValidationRuleType::ToBase,
stats,
) && strict_check
{
) {
continue;
}

Expand Down Expand Up @@ -588,7 +572,7 @@ pub mod tests {
// Pick with small max_compaction_bytes results partial sub levels included in input.
let config = Arc::new(
CompactionConfigBuilder::new()
.max_compaction_bytes(50000)
.max_compaction_bytes(100010)
.level0_sub_level_compact_level_count(1)
.build(),
);
Expand Down
46 changes: 15 additions & 31 deletions src/meta/src/hummock/compaction/picker/compaction_task_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,39 +41,41 @@ impl CompactionTaskValidator {
ValidationRuleType::Tier,
Box::new(TierCompactionTaskValidationRule {
config: config.clone(),
enable: true,
}),
);

validation_rules.insert(
ValidationRuleType::Intra,
Box::new(IntraCompactionTaskValidationRule {
config: config.clone(),
enable: true,
}),
);

validation_rules.insert(
ValidationRuleType::ToBase,
Box::new(BaseCompactionTaskValidationRule {
config,
enable: true,
}),
Box::new(BaseCompactionTaskValidationRule { config }),
);

CompactionTaskValidator { validation_rules }
}

pub fn unused() -> Self {
CompactionTaskValidator {
validation_rules: HashMap::default(),
}
}

pub fn valid_compact_task(
&self,
input: &CompactionInput,
picker_type: ValidationRuleType,
stats: &mut LocalPickerStatistic,
) -> bool {
self.validation_rules
.get(&picker_type)
.unwrap()
.validate(input, stats)
if let Some(validation_rule) = self.validation_rules.get(&picker_type) {
validation_rule.validate(input, stats)
} else {
true
}
}
}

Expand All @@ -83,15 +85,10 @@ pub trait CompactionTaskValidationRule {

struct TierCompactionTaskValidationRule {
config: Arc<CompactionConfig>,
enable: bool,
}

impl CompactionTaskValidationRule for TierCompactionTaskValidationRule {
fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
if !self.enable {
return true;
}

// so the design here wants to merge multiple overlapping-levels in one compaction
let max_compaction_bytes = std::cmp::min(
self.config.max_compaction_bytes,
Expand Down Expand Up @@ -128,19 +125,19 @@ impl CompactionTaskValidationRule for TierCompactionTaskValidationRule {

struct IntraCompactionTaskValidationRule {
config: Arc<CompactionConfig>,
enable: bool,
}

impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule {
fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
if !self.enable {
if input.total_file_count >= self.config.level0_max_compact_file_number {
return true;
}

let intra_sub_level_compact_level_count =
self.config.level0_sub_level_compact_level_count as usize;

if input.input_levels.len() < intra_sub_level_compact_level_count {
stats.skip_by_count_limit += 1;
return false;
}

Expand All @@ -163,34 +160,21 @@ impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule {
max_level_size * self.config.level0_sub_level_compact_level_count as u64 / 2
>= input.select_input_size;

if is_write_amp_large && input.total_file_count < self.config.level0_max_compact_file_number
{
if is_write_amp_large {
stats.skip_by_write_amp_limit += 1;
return false;
}

if input.input_levels.len() < intra_sub_level_compact_level_count
&& input.total_file_count < self.config.level0_max_compact_file_number
{
stats.skip_by_count_limit += 1;
return false;
}

true
}
}

struct BaseCompactionTaskValidationRule {
config: Arc<CompactionConfig>,
enable: bool,
}

impl CompactionTaskValidationRule for BaseCompactionTaskValidationRule {
fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
if !self.enable {
return true;
}

// The size of target level may be too large, we shall skip this compact task and wait
// the data in base level compact to lower level.
if input.target_input_size > self.config.max_compaction_bytes {
Expand Down
Loading

0 comments on commit 4dadb7c

Please sign in to comment.