diff --git a/proto/hummock.proto b/proto/hummock.proto index db99cbe5f850..a631bcf40dda 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -295,6 +295,7 @@ message CompactTask { SHARED_BUFFER = 4; TTL = 5; TOMBSTONE = 6; + EMERGENCY = 7; } // Identifies whether the task is space_reclaim, if the compact_task_type increases, it will be refactored to enum @@ -578,6 +579,7 @@ message RiseCtlUpdateCompactionConfigRequest { uint32 level0_overlapping_sub_level_compact_level_count = 12; uint64 max_space_reclaim_bytes = 13; uint64 level0_max_compact_file_number = 14; + bool enable_emergency_picker = 15; } } repeated uint64 compaction_group_ids = 1; @@ -709,6 +711,7 @@ message CompactionConfig { // for tier compaction pick overlapping level uint32 level0_overlapping_sub_level_compact_level_count = 18; uint32 tombstone_reclaim_ratio = 19; + bool enable_emergency_picker = 20; } message TableStats { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 2924b5dcdbf6..d34d794b4062 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1202,11 +1202,12 @@ pub mod default { const DEFAULT_MAX_SUB_COMPACTION: u32 = 4; const DEFAULT_LEVEL_MULTIPLIER: u64 = 5; const DEFAULT_MAX_SPACE_RECLAIM_BYTES: u64 = 512 * 1024 * 1024; // 512MB; - const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_SUB_LEVEL_NUMBER: u64 = 1000; + const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_SUB_LEVEL_NUMBER: u64 = 300; const DEFAULT_MAX_COMPACTION_FILE_COUNT: u64 = 96; const DEFAULT_MIN_SUB_LEVEL_COMPACT_LEVEL_COUNT: u32 = 3; const DEFAULT_MIN_OVERLAPPING_SUB_LEVEL_COMPACT_LEVEL_COUNT: u32 = 6; const DEFAULT_TOMBSTONE_RATIO_PERCENT: u32 = 40; + const DEFAULT_EMERGENCY_PICKER: bool = true; use crate::catalog::hummock::CompactionFilterFlag; @@ -1252,6 +1253,10 @@ pub mod default { pub fn tombstone_reclaim_ratio() -> u32 { DEFAULT_TOMBSTONE_RATIO_PERCENT } + + pub fn enable_emergency_picker() -> bool { + DEFAULT_EMERGENCY_PICKER + } } pub mod s3_objstore_config { @@ -1377,6 +1382,8 @@ pub struct CompactionConfig { pub level0_max_compact_file_number: u64, #[serde(default = "default::compaction_config::tombstone_reclaim_ratio")] pub tombstone_reclaim_ratio: u32, + #[serde(default = "default::compaction_config::enable_emergency_picker")] + pub enable_emergency_picker: bool, } #[cfg(test)] diff --git a/src/config/example.toml b/src/config/example.toml index c2456a59e45b..6e735dee19e0 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -49,12 +49,13 @@ level0_tier_compact_file_number = 6 target_file_size_base = 33554432 compaction_filter_mask = 6 max_sub_compaction = 4 -level0_stop_write_threshold_sub_level_number = 1000 +level0_stop_write_threshold_sub_level_number = 300 level0_sub_level_compact_level_count = 3 level0_overlapping_sub_level_compact_level_count = 6 max_space_reclaim_bytes = 536870912 level0_max_compact_file_number = 96 tombstone_reclaim_ratio = 40 +enable_emergency_picker = true [batch] enable_barrier_read = false diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs index 75a9884aece7..6f16279939e8 100644 --- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs +++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs @@ -60,6 +60,7 @@ pub fn build_compaction_config_vec( max_space_reclaim_bytes: Option, level0_max_compact_file_number: Option, level0_overlapping_sub_level_compact_level_count: Option, + enable_emergency_picker: Option, ) -> Vec { let mut configs = vec![]; if let Some(c) = max_bytes_for_level_base { @@ -101,6 +102,10 @@ pub fn build_compaction_config_vec( if let Some(c) = level0_overlapping_sub_level_compact_level_count { configs.push(MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c)) } + if let Some(c) = enable_emergency_picker { + configs.push(MutableConfig::EnableEmergencyPicker(c)) + } + configs } diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 45e61f80b5fa..854b9e9260eb 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -220,6 +220,8 @@ enum HummockCommands { level0_max_compact_file_number: Option, #[clap(long)] level0_overlapping_sub_level_compact_level_count: Option, + #[clap(long)] + enable_emergency_picker: Option, }, /// Split given compaction group into two. Moves the given tables to the new group. SplitCompactionGroup { @@ -549,6 +551,7 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { max_space_reclaim_bytes, level0_max_compact_file_number, level0_overlapping_sub_level_compact_level_count, + enable_emergency_picker, }) => { cmd_impl::hummock::update_compaction_config( context, @@ -567,6 +570,7 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { max_space_reclaim_bytes, level0_max_compact_file_number, level0_overlapping_sub_level_compact_level_count, + enable_emergency_picker, ), ) .await? diff --git a/src/meta/src/hummock/compaction/compaction_config.rs b/src/meta/src/hummock/compaction/compaction_config.rs index 13d568b3eaa3..4dfd0edc62a1 100644 --- a/src/meta/src/hummock/compaction/compaction_config.rs +++ b/src/meta/src/hummock/compaction/compaction_config.rs @@ -65,6 +65,7 @@ impl CompactionConfigBuilder { level0_overlapping_sub_level_compact_level_count: compaction_config::level0_overlapping_sub_level_compact_level_count(), tombstone_reclaim_ratio: compaction_config::tombstone_reclaim_ratio(), + enable_emergency_picker: compaction_config::enable_emergency_picker(), }, } } diff --git a/src/meta/src/hummock/compaction/level_selector.rs b/src/meta/src/hummock/compaction/level_selector.rs index 893dffd79d6b..05975f64da1b 100644 --- a/src/meta/src/hummock/compaction/level_selector.rs +++ b/src/meta/src/hummock/compaction/level_selector.rs @@ -26,8 +26,9 @@ use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{compact_task, CompactionConfig, LevelType}; use super::picker::{ - CompactionTaskValidator, IntraCompactionPicker, SpaceReclaimCompactionPicker, - SpaceReclaimPickerState, TtlPickerState, TtlReclaimCompactionPicker, + CompactionTaskValidator, EmergencyCompactionPicker, IntraCompactionPicker, + SpaceReclaimCompactionPicker, SpaceReclaimPickerState, TtlPickerState, + TtlReclaimCompactionPicker, }; use super::{ create_compaction_task, LevelCompactionPicker, ManualCompactionOption, ManualCompactionPicker, @@ -616,6 +617,50 @@ pub fn default_level_selector() -> Box { Box::::default() } +#[derive(Default)] +pub struct EmergencySelector {} + +impl LevelSelector for EmergencySelector { + fn pick_compaction( + &mut self, + task_id: HummockCompactionTaskId, + group: &CompactionGroup, + levels: &Levels, + level_handlers: &mut [LevelHandler], + selector_stats: &mut LocalSelectorStatistic, + _table_id_to_options: HashMap, + ) -> Option { + let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone()); + let ctx = dynamic_level_core.calculate_level_base_size(levels); + let picker = + EmergencyCompactionPicker::new(ctx.base_level, group.compaction_config.clone()); + + let mut stats = LocalPickerStatistic::default(); + if let Some(compaction_input) = picker.pick_compaction(levels, level_handlers, &mut stats) { + compaction_input.add_pending_task(task_id, level_handlers); + + return Some(create_compaction_task( + group.compaction_config.as_ref(), + compaction_input, + ctx.base_level, + self.task_type(), + )); + } + + selector_stats.skip_picker.push((0, ctx.base_level, stats)); + + None + } + + fn name(&self) -> &'static str { + "EmergencyCompaction" + } + + fn task_type(&self) -> compact_task::TaskType { + compact_task::TaskType::Emergency + } +} + #[cfg(test)] pub mod tests { use std::ops::Range; diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index a30e4b042211..23585da8999a 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -37,8 +37,8 @@ use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{CompactTask, CompactionConfig, KeyRange, LevelType}; pub use crate::hummock::compaction::level_selector::{ - default_level_selector, DynamicLevelSelector, DynamicLevelSelectorCore, LevelSelector, - ManualCompactionSelector, SpaceReclaimCompactionSelector, TtlCompactionSelector, + default_level_selector, DynamicLevelSelector, DynamicLevelSelectorCore, EmergencySelector, + LevelSelector, ManualCompactionSelector, SpaceReclaimCompactionSelector, TtlCompactionSelector, }; use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy}; use crate::hummock::compaction::picker::{CompactionInput, LocalPickerStatistic}; diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index c224fbfe6ce5..d9bd8d6020a2 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -131,21 +131,6 @@ impl LevelCompactionPicker { level_handlers: &[LevelHandler], stats: &mut LocalPickerStatistic, ) -> Option { - // TODO: remove this - let l0_size = l0.total_file_size - level_handlers[0].get_pending_file_size(); - let base_level_size = target_level.total_file_size - - level_handlers[target_level.level_idx as usize].get_pending_file_size(); - if l0_size < base_level_size { - stats.skip_by_write_amp_limit += 1; - return None; - } - - // no running base_compaction - let strict_check = level_handlers[0] - .get_pending_tasks() - .iter() - .any(|task| task.target_level != 0); - let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); let min_compaction_bytes = self.config.sub_level_max_compaction_bytes; let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new( @@ -239,8 +224,7 @@ impl LevelCompactionPicker { &result, ValidationRuleType::ToBase, stats, - ) && strict_check - { + ) { continue; } @@ -588,7 +572,7 @@ pub mod tests { // Pick with small max_compaction_bytes results partial sub levels included in input. let config = Arc::new( CompactionConfigBuilder::new() - .max_compaction_bytes(50000) + .max_compaction_bytes(100010) .level0_sub_level_compact_level_count(1) .build(), ); diff --git a/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs b/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs index 4de77467205f..eafe074a8828 100644 --- a/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs +++ b/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs @@ -41,7 +41,6 @@ impl CompactionTaskValidator { ValidationRuleType::Tier, Box::new(TierCompactionTaskValidationRule { config: config.clone(), - enable: true, }), ); @@ -49,31 +48,34 @@ impl CompactionTaskValidator { ValidationRuleType::Intra, Box::new(IntraCompactionTaskValidationRule { config: config.clone(), - enable: true, }), ); validation_rules.insert( ValidationRuleType::ToBase, - Box::new(BaseCompactionTaskValidationRule { - config, - enable: true, - }), + Box::new(BaseCompactionTaskValidationRule { config }), ); CompactionTaskValidator { validation_rules } } + pub fn unused() -> Self { + CompactionTaskValidator { + validation_rules: HashMap::default(), + } + } + pub fn valid_compact_task( &self, input: &CompactionInput, picker_type: ValidationRuleType, stats: &mut LocalPickerStatistic, ) -> bool { - self.validation_rules - .get(&picker_type) - .unwrap() - .validate(input, stats) + if let Some(validation_rule) = self.validation_rules.get(&picker_type) { + validation_rule.validate(input, stats) + } else { + true + } } } @@ -83,15 +85,10 @@ pub trait CompactionTaskValidationRule { struct TierCompactionTaskValidationRule { config: Arc, - enable: bool, } impl CompactionTaskValidationRule for TierCompactionTaskValidationRule { fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool { - if !self.enable { - return true; - } - // so the design here wants to merge multiple overlapping-levels in one compaction let max_compaction_bytes = std::cmp::min( self.config.max_compaction_bytes, @@ -128,12 +125,11 @@ impl CompactionTaskValidationRule for TierCompactionTaskValidationRule { struct IntraCompactionTaskValidationRule { config: Arc, - enable: bool, } impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule { fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool { - if !self.enable { + if input.total_file_count >= self.config.level0_max_compact_file_number { return true; } @@ -141,6 +137,7 @@ impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule { self.config.level0_sub_level_compact_level_count as usize; if input.input_levels.len() < intra_sub_level_compact_level_count { + stats.skip_by_count_limit += 1; return false; } @@ -163,34 +160,21 @@ impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule { max_level_size * self.config.level0_sub_level_compact_level_count as u64 / 2 >= input.select_input_size; - if is_write_amp_large && input.total_file_count < self.config.level0_max_compact_file_number - { + if is_write_amp_large { stats.skip_by_write_amp_limit += 1; return false; } - if input.input_levels.len() < intra_sub_level_compact_level_count - && input.total_file_count < self.config.level0_max_compact_file_number - { - stats.skip_by_count_limit += 1; - return false; - } - true } } struct BaseCompactionTaskValidationRule { config: Arc, - enable: bool, } impl CompactionTaskValidationRule for BaseCompactionTaskValidationRule { fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool { - if !self.enable { - return true; - } - // The size of target level may be too large, we shall skip this compact task and wait // the data in base level compact to lower level. if input.target_input_size > self.config.max_compaction_bytes { diff --git a/src/meta/src/hummock/compaction/picker/emergency_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/emergency_compaction_picker.rs new file mode 100644 index 000000000000..9866f7b64426 --- /dev/null +++ b/src/meta/src/hummock/compaction/picker/emergency_compaction_picker.rs @@ -0,0 +1,64 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use risingwave_pb::hummock::hummock_version::Levels; +use risingwave_pb::hummock::CompactionConfig; + +use super::{ + CompactionInput, CompactionPicker, CompactionTaskValidator, LevelCompactionPicker, + LocalPickerStatistic, TierCompactionPicker, +}; +use crate::hummock::level_handler::LevelHandler; + +pub struct EmergencyCompactionPicker { + target_level: usize, + config: Arc, +} + +impl EmergencyCompactionPicker { + pub fn new(target_level: usize, config: Arc) -> Self { + Self { + target_level, + config, + } + } + + pub fn pick_compaction( + &self, + levels: &Levels, + level_handlers: &[LevelHandler], + stats: &mut LocalPickerStatistic, + ) -> Option { + let unused_validator = Arc::new(CompactionTaskValidator::unused()); + + let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator( + self.target_level, + self.config.clone(), + unused_validator.clone(), + ); + + if let Some(ret) = + base_level_compaction_picker.pick_compaction(levels, level_handlers, stats) + { + return Some(ret); + } + + let mut tier_compaction_picker = + TierCompactionPicker::new_with_validator(self.config.clone(), unused_validator); + + tier_compaction_picker.pick_compaction(levels, level_handlers, stats) + } +} diff --git a/src/meta/src/hummock/compaction/picker/mod.rs b/src/meta/src/hummock/compaction/picker/mod.rs index cf3a4555e18e..ac1a8f825aa3 100644 --- a/src/meta/src/hummock/compaction/picker/mod.rs +++ b/src/meta/src/hummock/compaction/picker/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod base_level_compaction_picker; +mod emergency_compaction_picker; mod intra_compaction_picker; mod manual_compaction_picker; mod min_overlap_compaction_picker; @@ -26,6 +27,7 @@ mod compaction_task_validator; pub use base_level_compaction_picker::LevelCompactionPicker; pub use compaction_task_validator::{CompactionTaskValidator, ValidationRuleType}; +pub use emergency_compaction_picker::EmergencyCompactionPicker; pub use intra_compaction_picker::IntraCompactionPicker; pub use manual_compaction_picker::ManualCompactionPicker; pub use min_overlap_compaction_picker::MinOverlappingPicker; @@ -51,7 +53,7 @@ pub struct LocalPickerStatistic { pub skip_by_overlapping: u64, } -#[derive(Default)] +#[derive(Default, Debug)] pub struct CompactionInput { pub input_levels: Vec, pub target_level: usize, diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 8fa1aea32115..3e83937897b3 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -911,6 +911,9 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi MutableConfig::Level0MaxCompactFileNumber(c) => { target.level0_max_compact_file_number = *c; } + MutableConfig::EnableEmergencyPicker(c) => { + target.enable_emergency_picker = *c; + } } } } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 99f0c41d696d..a1e55506bc22 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -66,7 +66,8 @@ use tonic::Streaming; use tracing::warn; use crate::hummock::compaction::{ - CompactStatus, LocalSelectorStatistic, ManualCompactionOption, TombstoneCompactionSelector, + CompactStatus, EmergencySelector, LocalSelectorStatistic, ManualCompactionOption, + TombstoneCompactionSelector, }; use crate::hummock::error::{Error, Result}; use crate::hummock::metrics_utils::{ @@ -821,6 +822,7 @@ impl HummockManager { return Ok(None); } }; + let (current_version, watermark) = { let versioning_guard = read_lock!(self, versioning).await; let max_committed_epoch = versioning_guard.current_version.max_committed_epoch; @@ -829,7 +831,6 @@ impl HummockManager { .values() .map(|v| v.minimal_pinned_snapshot) .fold(max_committed_epoch, std::cmp::min); - (versioning_guard.current_version.clone(), watermark) }; if current_version.levels.get(&compaction_group_id).is_none() { @@ -837,7 +838,8 @@ impl HummockManager { return Ok(None); } - let can_trivial_move = matches!(selector.task_type(), compact_task::TaskType::Dynamic); + let can_trivial_move = matches!(selector.task_type(), compact_task::TaskType::Dynamic) + || matches!(selector.task_type(), compact_task::TaskType::Emergency); let mut stats = LocalSelectorStatistic::default(); let member_table_ids = ¤t_version @@ -1277,8 +1279,6 @@ impl HummockManager { let label = if is_trivial_reclaim { "trivial-space-reclaim" } else if is_trivial_move { - // TODO: only support can_trivial_move in DynamicLevelCompcation, will check - // task_type next PR "trivial-move" } else { self.compactor_manager @@ -1312,7 +1312,8 @@ impl HummockManager { ); if !deterministic_mode - && matches!(compact_task.task_type(), compact_task::TaskType::Dynamic) + && (matches!(compact_task.task_type(), compact_task::TaskType::Dynamic) + || matches!(compact_task.task_type(), compact_task::TaskType::Emergency)) { // only try send Dynamic compaction self.try_send_compaction_request( @@ -2469,6 +2470,7 @@ impl HummockManager { } } + #[named] pub fn compaction_event_loop( hummock_manager: Arc, mut compactor_streams_change_rx: UnboundedReceiver<( @@ -2558,7 +2560,32 @@ impl HummockManager { assert_ne!(0, pull_task_count); if let Some(compactor) = hummock_manager.compactor_manager.get_compactor(context_id) { if let Some((group, task_type)) = hummock_manager.auto_pick_compaction_group_and_type().await { - let selector: &mut Box = compaction_selectors.get_mut(&task_type).unwrap(); + let selector: &mut Box = { + let versioning_guard = read_lock!(hummock_manager, versioning).await; + let versioning = versioning_guard.deref(); + + if versioning.write_limit.contains_key(&group) { + let enable_emergency_picker = match hummock_manager + .compaction_group_manager + .read() + .await + .try_get_compaction_group_config(group) + { + Some(config) =>{ config.compaction_config.enable_emergency_picker }, + None => { unreachable!("compaction-group {} not exist", group) } + }; + + if enable_emergency_picker { + compaction_selectors.get_mut(&TaskType::Emergency).unwrap() + } else { + compaction_selectors.get_mut(&task_type).unwrap() + } + } else { + compaction_selectors.get_mut(&task_type).unwrap() + } + + }; + for _ in 0..pull_task_count { let compact_task = hummock_manager @@ -2881,6 +2908,11 @@ fn init_selectors() -> HashMap> { compact_task::TaskType::Tombstone, Box::::default(), ); + + compaction_selectors.insert( + compact_task::TaskType::Emergency, + Box::::default(), + ); compaction_selectors }