Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): emergency picker for write_limit #12183

Merged
merged 16 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ message CompactTask {
SHARED_BUFFER = 4;
TTL = 5;
TOMBSTONE = 6;
EMERGENCY = 7;
}

// Identifies whether the task is space_reclaim, if the compact_task_type increases, it will be refactored to enum
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1203,7 +1203,7 @@ pub mod default {
const DEFAULT_MAX_SUB_COMPACTION: u32 = 4;
const DEFAULT_LEVEL_MULTIPLIER: u64 = 5;
const DEFAULT_MAX_SPACE_RECLAIM_BYTES: u64 = 512 * 1024 * 1024; // 512MB;
const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_SUB_LEVEL_NUMBER: u64 = 1000;
const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_SUB_LEVEL_NUMBER: u64 = 300;
const DEFAULT_MAX_COMPACTION_FILE_COUNT: u64 = 96;
const DEFAULT_MIN_SUB_LEVEL_COMPACT_LEVEL_COUNT: u32 = 3;
const DEFAULT_MIN_OVERLAPPING_SUB_LEVEL_COMPACT_LEVEL_COUNT: u32 = 6;
Expand Down
2 changes: 1 addition & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ level0_tier_compact_file_number = 6
target_file_size_base = 33554432
compaction_filter_mask = 6
max_sub_compaction = 4
level0_stop_write_threshold_sub_level_number = 1000
level0_stop_write_threshold_sub_level_number = 300
level0_sub_level_compact_level_count = 3
level0_overlapping_sub_level_compact_level_count = 6
max_space_reclaim_bytes = 536870912
Expand Down
49 changes: 47 additions & 2 deletions src/meta/src/hummock/compaction/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{compact_task, CompactionConfig, LevelType};

use super::picker::{
CompactionTaskValidator, IntraCompactionPicker, SpaceReclaimCompactionPicker,
SpaceReclaimPickerState, TtlPickerState, TtlReclaimCompactionPicker,
CompactionTaskValidator, EmergencyCompactionPicker, IntraCompactionPicker,
SpaceReclaimCompactionPicker, SpaceReclaimPickerState, TtlPickerState,
TtlReclaimCompactionPicker,
};
use super::{
create_compaction_task, LevelCompactionPicker, ManualCompactionOption, ManualCompactionPicker,
Expand Down Expand Up @@ -616,6 +617,50 @@ pub fn default_level_selector() -> Box<dyn LevelSelector> {
Box::<DynamicLevelSelector>::default()
}

#[derive(Default)]
pub struct EmergencySelector {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move to an independent file, this file is enough large....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'm going to separate the implementation of the different selectors in a separate pr and put it in a independent file, like the picker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


impl LevelSelector for EmergencySelector {
fn pick_compaction(
&mut self,
task_id: HummockCompactionTaskId,
group: &CompactionGroup,
levels: &Levels,
level_handlers: &mut [LevelHandler],
selector_stats: &mut LocalSelectorStatistic,
_table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactionTask> {
let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone());
let ctx = dynamic_level_core.calculate_level_base_size(levels);
let picker =
EmergencyCompactionPicker::new(ctx.base_level, group.compaction_config.clone());

let mut stats = LocalPickerStatistic::default();
if let Some(compaction_input) = picker.pick_compaction(levels, level_handlers, &mut stats) {
compaction_input.add_pending_task(task_id, level_handlers);

return Some(create_compaction_task(
group.compaction_config.as_ref(),
compaction_input,
ctx.base_level,
self.task_type(),
));
}

selector_stats.skip_picker.push((0, ctx.base_level, stats));

None
}

fn name(&self) -> &'static str {
"EmergencyCompaction"
}

fn task_type(&self) -> compact_task::TaskType {
compact_task::TaskType::Emergency
}
}

#[cfg(test)]
pub mod tests {
use std::ops::Range;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactTask, CompactionConfig, KeyRange, LevelType};

pub use crate::hummock::compaction::level_selector::{
default_level_selector, DynamicLevelSelector, DynamicLevelSelectorCore, LevelSelector,
ManualCompactionSelector, SpaceReclaimCompactionSelector, TtlCompactionSelector,
default_level_selector, DynamicLevelSelector, DynamicLevelSelectorCore, EmergencySelector,
LevelSelector, ManualCompactionSelector, SpaceReclaimCompactionSelector, TtlCompactionSelector,
};
use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy};
use crate::hummock::compaction::picker::{CompactionInput, LocalPickerStatistic};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,6 @@ impl LevelCompactionPicker {
level_handlers: &[LevelHandler],
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
// TODO: remove this
let l0_size = l0.total_file_size - level_handlers[0].get_pending_file_size();
Li0k marked this conversation as resolved.
Show resolved Hide resolved
let base_level_size = target_level.total_file_size
- level_handlers[target_level.level_idx as usize].get_pending_file_size();
if l0_size < base_level_size {
stats.skip_by_write_amp_limit += 1;
return None;
}

// no running base_compaction
let strict_check = level_handlers[0]
.get_pending_tasks()
.iter()
.any(|task| task.target_level != 0);

let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
let min_compaction_bytes = self.config.sub_level_max_compaction_bytes;
let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
Expand Down Expand Up @@ -239,8 +224,7 @@ impl LevelCompactionPicker {
&result,
ValidationRuleType::ToBase,
stats,
) && strict_check
{
) {
continue;
}

Expand Down
46 changes: 15 additions & 31 deletions src/meta/src/hummock/compaction/picker/compaction_task_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,39 +41,41 @@ impl CompactionTaskValidator {
ValidationRuleType::Tier,
Box::new(TierCompactionTaskValidationRule {
config: config.clone(),
enable: true,
}),
);

validation_rules.insert(
ValidationRuleType::Intra,
Box::new(IntraCompactionTaskValidationRule {
config: config.clone(),
enable: true,
}),
);

validation_rules.insert(
ValidationRuleType::ToBase,
Box::new(BaseCompactionTaskValidationRule {
config,
enable: true,
}),
Box::new(BaseCompactionTaskValidationRule { config }),
);

CompactionTaskValidator { validation_rules }
}

pub fn unused() -> Self {
CompactionTaskValidator {
validation_rules: HashMap::default(),
}
}

pub fn valid_compact_task(
&self,
input: &CompactionInput,
picker_type: ValidationRuleType,
stats: &mut LocalPickerStatistic,
) -> bool {
self.validation_rules
.get(&picker_type)
.unwrap()
.validate(input, stats)
if let Some(validation_rule) = self.validation_rules.get(&picker_type) {
validation_rule.validate(input, stats)
} else {
true
}
}
}

Expand All @@ -83,15 +85,10 @@ pub trait CompactionTaskValidationRule {

struct TierCompactionTaskValidationRule {
config: Arc<CompactionConfig>,
enable: bool,
}

impl CompactionTaskValidationRule for TierCompactionTaskValidationRule {
fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
if !self.enable {
return true;
}

// so the design here wants to merge multiple overlapping-levels in one compaction
let max_compaction_bytes = std::cmp::min(
self.config.max_compaction_bytes,
Expand Down Expand Up @@ -128,19 +125,19 @@ impl CompactionTaskValidationRule for TierCompactionTaskValidationRule {

struct IntraCompactionTaskValidationRule {
config: Arc<CompactionConfig>,
enable: bool,
}

impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule {
fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
if !self.enable {
if input.total_file_count >= self.config.level0_max_compact_file_number {
return true;
}

let intra_sub_level_compact_level_count =
self.config.level0_sub_level_compact_level_count as usize;

if input.input_levels.len() < intra_sub_level_compact_level_count {
stats.skip_by_count_limit += 1;
return false;
}

Expand All @@ -163,34 +160,21 @@ impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule {
max_level_size * self.config.level0_sub_level_compact_level_count as u64 / 2
>= input.select_input_size;

if is_write_amp_large && input.total_file_count < self.config.level0_max_compact_file_number
{
if is_write_amp_large {
stats.skip_by_write_amp_limit += 1;
return false;
}

if input.input_levels.len() < intra_sub_level_compact_level_count
&& input.total_file_count < self.config.level0_max_compact_file_number
{
stats.skip_by_count_limit += 1;
return false;
}

true
}
}

struct BaseCompactionTaskValidationRule {
config: Arc<CompactionConfig>,
enable: bool,
}

impl CompactionTaskValidationRule for BaseCompactionTaskValidationRule {
fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
if !self.enable {
return true;
}

// The size of target level may be too large, we shall skip this compact task and wait
// the data in base level compact to lower level.
if input.target_input_size > self.config.max_compaction_bytes {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::CompactionConfig;

use super::{
CompactionInput, CompactionPicker, CompactionTaskValidator, LevelCompactionPicker,
LocalPickerStatistic, TierCompactionPicker,
};
use crate::hummock::level_handler::LevelHandler;

pub struct EmergencyCompactionPicker {
target_level: usize,
config: Arc<CompactionConfig>,
}

impl EmergencyCompactionPicker {
pub fn new(target_level: usize, config: Arc<CompactionConfig>) -> Self {
Self {
target_level,
config,
}
}

pub fn pick_compaction(
&self,
levels: &Levels,
level_handlers: &[LevelHandler],
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
let unused_validator = Arc::new(CompactionTaskValidator::unused());

let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator(
self.target_level,
self.config.clone(),
unused_validator.clone(),
);

if let Some(ret) =
base_level_compaction_picker.pick_compaction(levels, level_handlers, stats)
{
return Some(ret);
}

let mut tier_compaction_picker =
TierCompactionPicker::new_with_validator(self.config.clone(), unused_validator);

tier_compaction_picker.pick_compaction(levels, level_handlers, stats)
}
}
2 changes: 2 additions & 0 deletions src/meta/src/hummock/compaction/picker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

mod base_level_compaction_picker;
mod emergency_compaction_picker;
mod intra_compaction_picker;
mod manual_compaction_picker;
mod min_overlap_compaction_picker;
Expand All @@ -26,6 +27,7 @@ mod compaction_task_validator;

pub use base_level_compaction_picker::LevelCompactionPicker;
pub use compaction_task_validator::{CompactionTaskValidator, ValidationRuleType};
pub use emergency_compaction_picker::EmergencyCompactionPicker;
pub use intra_compaction_picker::IntraCompactionPicker;
pub use manual_compaction_picker::ManualCompactionPicker;
pub use min_overlap_compaction_picker::MinOverlappingPicker;
Expand Down
Loading
Loading