diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index 197a565090a37..49eeaa5778ab7 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -32,7 +32,8 @@ use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{CompactTask, CompactionConfig, LevelType}; pub use selector::CompactionSelector; -use self::selector::LocalSelectorStatistic; +use self::selector::{EmergencySelector, LocalSelectorStatistic}; +use super::check_cg_write_limit; use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy}; use crate::hummock::compaction::picker::CompactionInput; use crate::hummock::level_handler::LevelHandler; @@ -101,15 +102,34 @@ impl CompactStatus { // When we compact the files, we must make the result of compaction meet the following // conditions, for any user key, the epoch of it in the file existing in the lower // layer must be larger. - selector.pick_compaction( + if let Some(task) = selector.pick_compaction( task_id, group, levels, &mut self.level_handlers, stats, - table_id_to_options, - developer_config, - ) + table_id_to_options.clone(), + developer_config.clone(), + ) { + return Some(task); + } else { + let compaction_group_config = &group.compaction_config; + if check_cg_write_limit(levels, compaction_group_config.as_ref()).is_write_stop() + && compaction_group_config.enable_emergency_picker + { + return EmergencySelector::default().pick_compaction( + task_id, + group, + levels, + &mut self.level_handlers, + stats, + table_id_to_options, + developer_config, + ); + } + } + + None } pub fn is_trivial_move_task(task: &CompactTask) -> bool { diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index 065e9745f7dbb..39dd44565865d 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -20,13 +20,16 @@ use futures::future::Shared; use itertools::Itertools; use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId}; use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; +use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::subscribe_compaction_event_request::{ self, Event as RequestEvent, PullTask, }; use risingwave_pb::hummock::subscribe_compaction_event_response::{ Event as ResponseEvent, PullTaskAck, }; -use risingwave_pb::hummock::{CompactStatus as PbCompactStatus, CompactTaskAssignment}; +use risingwave_pb::hummock::{ + CompactStatus as PbCompactStatus, CompactTaskAssignment, CompactionConfig, +}; use thiserror_ext::AsReport; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::oneshot::Receiver as OneShotReceiver; @@ -257,3 +260,41 @@ impl HummockManager { } } } + +pub fn check_cg_write_limit( + levels: &Levels, + compaction_config: &CompactionConfig, +) -> WriteLimitType { + let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize; + let l0_sub_level_number = levels.l0.as_ref().unwrap().sub_levels.len(); + if threshold < l0_sub_level_number { + return WriteLimitType::WriteStop(l0_sub_level_number, threshold); + } + + WriteLimitType::Unlimited +} + +pub enum WriteLimitType { + Unlimited, + + // (l0_level_count, threshold) + WriteStop(usize, usize), +} + +impl WriteLimitType { + pub fn as_str(&self) -> String { + match self { + Self::Unlimited => "Unlimited".to_string(), + Self::WriteStop(l0_level_count, threshold) => { + format!( + "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels", + l0_level_count, threshold + ) + } + } + } + + pub fn is_write_stop(&self) -> bool { + matches!(self, Self::WriteStop(_, _)) + } +} diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 87e776d432894..693dcfa15676e 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -114,6 +114,7 @@ mod utils; mod worker; use compaction::*; +pub use compaction::{check_cg_write_limit, WriteLimitType}; pub(crate) use utils::*; type Snapshot = ArcSwap; @@ -2812,6 +2813,7 @@ impl HummockManager { *self.group_to_table_vnode_partition.write() = group_to_table_vnode_partition; } + /// dedicated event runtime for CPU/IO bound event pub fn compaction_event_loop( hummock_manager: Arc, mut compactor_streams_change_rx: UnboundedReceiver<( @@ -3019,39 +3021,14 @@ impl HummockManager { /// This method will return all compaction group id in a random order and task type. If there are any group block by `write_limit`, it will return a single array with `TaskType::Emergency`. /// If these groups get different task-type, it will return all group id with `TaskType::Dynamic` if the first group get `TaskType::Dynamic`, otherwise it will return the single group with other task type. - #[named] pub async fn auto_pick_compaction_groups_and_type( &self, ) -> (Vec, compact_task::TaskType) { - let versioning_guard = read_lock!(self, versioning).await; - let versioning = versioning_guard.deref(); - let mut compaction_group_ids = - get_compaction_group_ids(&versioning.current_version).collect_vec(); + let mut compaction_group_ids = self.compaction_group_ids().await; compaction_group_ids.shuffle(&mut thread_rng()); let mut normal_groups = vec![]; for cg_id in compaction_group_ids { - if versioning.write_limit.contains_key(&cg_id) { - let enable_emergency_picker = match self - .compaction_group_manager - .read() - .await - .try_get_compaction_group_config(cg_id) - { - Some(config) => config.compaction_config.enable_emergency_picker, - None => { - unreachable!("compaction-group {} not exist", cg_id) - } - }; - - if enable_emergency_picker { - if normal_groups.is_empty() { - return (vec![cg_id], TaskType::Emergency); - } else { - break; - } - } - } if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) { if pick_type == TaskType::Dynamic { normal_groups.push(cg_id); @@ -3436,10 +3413,6 @@ fn init_selectors() -> HashMap::default(), ); - compaction_selectors.insert( - compact_task::TaskType::Emergency, - Box::::default(), - ); compaction_selectors } @@ -3448,7 +3421,6 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks; use risingwave_hummock_sdk::version::HummockVersion; use tokio::sync::mpsc::error::SendError; -use super::compaction::selector::EmergencySelector; use super::compaction::CompactionSelector; use crate::controller::SqlMetaStore; use crate::hummock::manager::checkpoint::HummockVersionCheckpoint; diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 18e6950585c72..25b1fe622ef2c 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -35,6 +35,7 @@ use risingwave_pb::hummock::{ }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; +use super::check_cg_write_limit; use crate::hummock::error::Result; use crate::hummock::manager::checkpoint::HummockVersionCheckpoint; use crate::hummock::manager::worker::{HummockManagerEvent, HummockManagerEventSender}; @@ -321,20 +322,14 @@ pub(super) fn calc_new_write_limits( } Some(levels) => levels, }; - // Add write limit conditions here. - let threshold = config - .compaction_config - .level0_stop_write_threshold_sub_level_number as usize; - let l0_sub_level_number = levels.l0.as_ref().unwrap().sub_levels.len(); - if threshold < l0_sub_level_number { + + let write_limit_type = check_cg_write_limit(levels, config.compaction_config.as_ref()); + if write_limit_type.is_write_stop() { new_write_limits.insert( *id, WriteLimit { table_ids: levels.member_table_ids.clone(), - reason: format!( - "too many L0 sub levels: {} > {}", - l0_sub_level_number, threshold - ), + reason: write_limit_type.as_str(), }, ); continue; @@ -519,7 +514,7 @@ mod tests { ); assert_eq!( new_write_limits.get(&1).as_ref().unwrap().reason, - "too many L0 sub levels: 11 > 10" + "WriteStop(l0_level_count: 11, threshold: 10) too many L0 sub levels" ); assert_eq!(new_write_limits.len(), 2); @@ -540,7 +535,7 @@ mod tests { ); assert_eq!( new_write_limits.get(&1).as_ref().unwrap().reason, - "too many L0 sub levels: 11 > 5" + "WriteStop(l0_level_count: 11, threshold: 5) too many L0 sub levels" ); }