Skip to content

Commit

Permalink
fix(storage): refactor emergency picker (#15954)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored and hzxa21 committed Apr 22, 2024
1 parent c55d825 commit 2f83d85
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 49 deletions.
30 changes: 25 additions & 5 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactTask, CompactionConfig, LevelType};
pub use selector::CompactionSelector;

use self::selector::LocalSelectorStatistic;
use self::selector::{EmergencySelector, LocalSelectorStatistic};
use super::check_cg_write_limit;
use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy};
use crate::hummock::compaction::picker::CompactionInput;
use crate::hummock::level_handler::LevelHandler;
Expand Down Expand Up @@ -101,15 +102,34 @@ impl CompactStatus {
// When we compact the files, we must make the result of compaction meet the following
// conditions, for any user key, the epoch of it in the file existing in the lower
// layer must be larger.
selector.pick_compaction(
if let Some(task) = selector.pick_compaction(
task_id,
group,
levels,
&mut self.level_handlers,
stats,
table_id_to_options,
developer_config,
)
table_id_to_options.clone(),
developer_config.clone(),
) {
return Some(task);
} else {
let compaction_group_config = &group.compaction_config;
if check_cg_write_limit(levels, compaction_group_config.as_ref()).is_write_stop()
&& compaction_group_config.enable_emergency_picker
{
return EmergencySelector::default().pick_compaction(
task_id,
group,
levels,
&mut self.level_handlers,
stats,
table_id_to_options,
developer_config,
);
}
}

None
}

pub fn is_trivial_move_task(task: &CompactTask) -> bool {
Expand Down
43 changes: 42 additions & 1 deletion src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ use futures::future::Shared;
use itertools::Itertools;
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::subscribe_compaction_event_request::{
self, Event as RequestEvent, PullTask,
};
use risingwave_pb::hummock::subscribe_compaction_event_response::{
Event as ResponseEvent, PullTaskAck,
};
use risingwave_pb::hummock::{CompactStatus as PbCompactStatus, CompactTaskAssignment};
use risingwave_pb::hummock::{
CompactStatus as PbCompactStatus, CompactTaskAssignment, CompactionConfig,
};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::oneshot::Receiver as OneShotReceiver;
Expand Down Expand Up @@ -257,3 +260,41 @@ impl HummockManager {
}
}
}

pub fn check_cg_write_limit(
levels: &Levels,
compaction_config: &CompactionConfig,
) -> WriteLimitType {
let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
let l0_sub_level_number = levels.l0.as_ref().unwrap().sub_levels.len();
if threshold < l0_sub_level_number {
return WriteLimitType::WriteStop(l0_sub_level_number, threshold);
}

WriteLimitType::Unlimited
}

pub enum WriteLimitType {
Unlimited,

// (l0_level_count, threshold)
WriteStop(usize, usize),
}

impl WriteLimitType {
pub fn as_str(&self) -> String {
match self {
Self::Unlimited => "Unlimited".to_string(),
Self::WriteStop(l0_level_count, threshold) => {
format!(
"WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
l0_level_count, threshold
)
}
}
}

pub fn is_write_stop(&self) -> bool {
matches!(self, Self::WriteStop(_, _))
}
}
34 changes: 3 additions & 31 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ mod utils;
mod worker;

use compaction::*;
pub use compaction::{check_cg_write_limit, WriteLimitType};
pub(crate) use utils::*;

type Snapshot = ArcSwap<HummockSnapshot>;
Expand Down Expand Up @@ -2812,6 +2813,7 @@ impl HummockManager {
*self.group_to_table_vnode_partition.write() = group_to_table_vnode_partition;
}

/// dedicated event runtime for CPU/IO bound event
pub fn compaction_event_loop(
hummock_manager: Arc<Self>,
mut compactor_streams_change_rx: UnboundedReceiver<(
Expand Down Expand Up @@ -3019,39 +3021,14 @@ impl HummockManager {

/// This method will return all compaction group id in a random order and task type. If there are any group block by `write_limit`, it will return a single array with `TaskType::Emergency`.
/// If these groups get different task-type, it will return all group id with `TaskType::Dynamic` if the first group get `TaskType::Dynamic`, otherwise it will return the single group with other task type.
#[named]
pub async fn auto_pick_compaction_groups_and_type(
&self,
) -> (Vec<CompactionGroupId>, compact_task::TaskType) {
let versioning_guard = read_lock!(self, versioning).await;
let versioning = versioning_guard.deref();
let mut compaction_group_ids =
get_compaction_group_ids(&versioning.current_version).collect_vec();
let mut compaction_group_ids = self.compaction_group_ids().await;
compaction_group_ids.shuffle(&mut thread_rng());

let mut normal_groups = vec![];
for cg_id in compaction_group_ids {
if versioning.write_limit.contains_key(&cg_id) {
let enable_emergency_picker = match self
.compaction_group_manager
.read()
.await
.try_get_compaction_group_config(cg_id)
{
Some(config) => config.compaction_config.enable_emergency_picker,
None => {
unreachable!("compaction-group {} not exist", cg_id)
}
};

if enable_emergency_picker {
if normal_groups.is_empty() {
return (vec![cg_id], TaskType::Emergency);
} else {
break;
}
}
}
if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
if pick_type == TaskType::Dynamic {
normal_groups.push(cg_id);
Expand Down Expand Up @@ -3436,10 +3413,6 @@ fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelecto
compact_task::TaskType::Tombstone,
Box::<TombstoneCompactionSelector>::default(),
);
compaction_selectors.insert(
compact_task::TaskType::Emergency,
Box::<EmergencySelector>::default(),
);
compaction_selectors
}

Expand All @@ -3448,7 +3421,6 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::HummockVersion;
use tokio::sync::mpsc::error::SendError;

use super::compaction::selector::EmergencySelector;
use super::compaction::CompactionSelector;
use crate::controller::SqlMetaStore;
use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
Expand Down
19 changes: 7 additions & 12 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use risingwave_pb::hummock::{
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};

use super::check_cg_write_limit;
use crate::hummock::error::Result;
use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
use crate::hummock::manager::worker::{HummockManagerEvent, HummockManagerEventSender};
Expand Down Expand Up @@ -321,20 +322,14 @@ pub(super) fn calc_new_write_limits(
}
Some(levels) => levels,
};
// Add write limit conditions here.
let threshold = config
.compaction_config
.level0_stop_write_threshold_sub_level_number as usize;
let l0_sub_level_number = levels.l0.as_ref().unwrap().sub_levels.len();
if threshold < l0_sub_level_number {

let write_limit_type = check_cg_write_limit(levels, config.compaction_config.as_ref());
if write_limit_type.is_write_stop() {
new_write_limits.insert(
*id,
WriteLimit {
table_ids: levels.member_table_ids.clone(),
reason: format!(
"too many L0 sub levels: {} > {}",
l0_sub_level_number, threshold
),
reason: write_limit_type.as_str(),
},
);
continue;
Expand Down Expand Up @@ -519,7 +514,7 @@ mod tests {
);
assert_eq!(
new_write_limits.get(&1).as_ref().unwrap().reason,
"too many L0 sub levels: 11 > 10"
"WriteStop(l0_level_count: 11, threshold: 10) too many L0 sub levels"
);
assert_eq!(new_write_limits.len(), 2);

Expand All @@ -540,7 +535,7 @@ mod tests {
);
assert_eq!(
new_write_limits.get(&1).as_ref().unwrap().reason,
"too many L0 sub levels: 11 > 5"
"WriteStop(l0_level_count: 11, threshold: 5) too many L0 sub levels"
);
}

Expand Down

0 comments on commit 2f83d85

Please sign in to comment.