Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(storage): add enable_trivial_move and enable_check_task_overlap #14801

Merged
merged 8 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,12 @@ pub struct MetaDeveloperConfig {
/// in the meta node.
#[serde(default = "default::developer::meta_cached_traces_memory_limit_bytes")]
pub cached_traces_memory_limit_bytes: usize,

/// Compaction picker config
#[serde(default = "default::developer::enable_trivial_move")]
pub enable_trivial_move: bool,
#[serde(default = "default::developer::enable_check_task_level_overlap")]
pub enable_check_task_level_overlap: bool,
}

/// The section `[server]` in `risingwave.toml`.
Expand Down Expand Up @@ -1406,6 +1412,14 @@ pub mod default {
pub fn stream_hash_agg_max_dirty_groups_heap_size() -> usize {
64 << 20 // 64MB
}

pub fn enable_trivial_move() -> bool {
true
}

pub fn enable_check_task_level_overlap() -> bool {
false
}
}

pub use crate::system_param::default as system;
Expand Down
2 changes: 2 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ enable_emergency_picker = true
[meta.developer]
meta_cached_traces_num = 256
meta_cached_traces_memory_limit_bytes = 134217728
meta_enable_trivial_move = true
meta_enable_check_task_level_overlap = false

[batch]
enable_barrier_read = false
Expand Down
5 changes: 5 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
.meta
.developer
.cached_traces_memory_limit_bytes,
enable_trivial_move: config.meta.developer.enable_trivial_move,
enable_check_task_level_overlap: config
.meta
.developer
.enable_check_task_level_overlap,
},
config.system.into_init_system_params(),
)
Expand Down
38 changes: 29 additions & 9 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::hummock::compaction::picker::CompactionInput;
use crate::hummock::level_handler::LevelHandler;
use crate::hummock::model::CompactionGroup;

#[derive(Clone)]
pub struct CompactStatus {
pub compaction_group_id: CompactionGroupId,
pub level_handlers: Vec<LevelHandler>,
Expand All @@ -60,15 +61,6 @@ impl PartialEq for CompactStatus {
}
}

impl Clone for CompactStatus {
fn clone(&self) -> Self {
Self {
compaction_group_id: self.compaction_group_id,
level_handlers: self.level_handlers.clone(),
}
}
}

pub struct CompactionTask {
pub input: CompactionInput,
pub base_level: usize,
Expand Down Expand Up @@ -104,6 +96,7 @@ impl CompactStatus {
stats: &mut LocalSelectorStatistic,
selector: &mut Box<dyn CompactionSelector>,
table_id_to_options: HashMap<u32, TableOption>,
developer_config: Arc<CompactionDeveloperConfig>,
) -> Option<CompactionTask> {
// When we compact the files, we must make the result of compaction meet the following
// conditions, for any user key, the epoch of it in the file existing in the lower
Expand All @@ -115,6 +108,7 @@ impl CompactStatus {
&mut self.level_handlers,
stats,
table_id_to_options,
developer_config,
)
}

Expand Down Expand Up @@ -216,3 +210,29 @@ pub fn get_compression_algorithm(
compaction_config.compression_algorithm[idx].clone()
}
}

pub struct CompactionDeveloperConfig {
/// l0 picker whether to select trivial move task
pub enable_trivial_move: bool,

/// l0 multi level picker whether to check the overlap accuracy between sub levels
pub enable_check_task_level_overlap: bool,
}

impl CompactionDeveloperConfig {
pub fn new_from_meta_opts(opts: &MetaOpts) {
Self {
enable_trivial_move: opts.enable_trivial_move,
enable_check_task_level_overlap: opts.enable_check_task_level_overlap,
}
}
}

impl Default for CompactionDeveloperConfig {
fn default() -> Self {
Self {
enable_trivial_move: true,
enable_check_task_level_overlap: true,
}
}
}
4 changes: 2 additions & 2 deletions src/meta/src/hummock/compaction/overlap_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_hummock_sdk::key_range::KeyRangeCommon;
use risingwave_hummock_sdk::KeyComparator;
use risingwave_pb::hummock::{KeyRange, SstableInfo};

pub trait OverlapInfo {
pub trait OverlapInfo: Debug {
fn check_overlap(&self, a: &SstableInfo) -> bool;
fn check_multiple_overlap(&self, others: &[SstableInfo]) -> Range<usize>;
fn check_multiple_include(&self, others: &[SstableInfo]) -> Range<usize>;
Expand Down Expand Up @@ -67,7 +67,7 @@ pub trait OverlapStrategy: Send + Sync {
fn create_overlap_info(&self) -> Box<dyn OverlapInfo>;
}

#[derive(Default)]
#[derive(Default, Debug)]
pub struct RangeOverlapInfo {
target_range: Option<KeyRange>,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ use super::{
CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
ValidationRuleType,
};
use crate::hummock::compaction::create_overlap_strategy;
use crate::hummock::compaction::picker::TrivialMovePicker;
use crate::hummock::compaction::{create_overlap_strategy, CompactionDeveloperConfig};
use crate::hummock::level_handler::LevelHandler;

pub struct LevelCompactionPicker {
target_level: usize,
config: Arc<CompactionConfig>,
compaction_task_validator: Arc<CompactionTaskValidator>,
developer_config: Arc<CompactionDeveloperConfig>,
}

impl CompactionPicker for LevelCompactionPicker {
Expand Down Expand Up @@ -87,23 +88,30 @@ impl CompactionPicker for LevelCompactionPicker {

impl LevelCompactionPicker {
#[cfg(test)]
pub fn new(target_level: usize, config: Arc<CompactionConfig>) -> LevelCompactionPicker {
pub fn new(
target_level: usize,
config: Arc<CompactionConfig>,
developer_config: Arc<CompactionDeveloperConfig>,
) -> LevelCompactionPicker {
LevelCompactionPicker {
target_level,
compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
config,
developer_config,
}
}

pub fn new_with_validator(
target_level: usize,
config: Arc<CompactionConfig>,
compaction_task_validator: Arc<CompactionTaskValidator>,
developer_config: Arc<CompactionDeveloperConfig>,
) -> LevelCompactionPicker {
LevelCompactionPicker {
target_level,
config,
compaction_task_validator,
developer_config,
}
}

Expand All @@ -115,8 +123,12 @@ impl LevelCompactionPicker {
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
let trivial_move_picker =
TrivialMovePicker::new(0, self.target_level, overlap_strategy.clone());
let trivial_move_picker = TrivialMovePicker::new(
0,
self.target_level,
overlap_strategy.clone(),
self.developer_config.enable_trivial_move,
);

trivial_move_picker.pick_trivial_move_task(
&l0.sub_levels[0].table_infos,
Expand Down Expand Up @@ -148,6 +160,7 @@ impl LevelCompactionPicker {
// The maximum number of sub_level compact level per task
self.config.level0_max_compact_file_number,
overlap_strategy.clone(),
self.developer_config.enable_check_task_level_overlap,
);

let mut max_vnode_partition_idx = 0;
Expand Down Expand Up @@ -264,7 +277,9 @@ pub mod tests {
use super::*;
use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
use crate::hummock::compaction::selector::tests::*;
use crate::hummock::compaction::{CompactionMode, TierCompactionPicker};
use crate::hummock::compaction::{
CompactionDeveloperConfig, CompactionMode, TierCompactionPicker,
};

fn create_compaction_picker_for_test() -> LevelCompactionPicker {
let config = Arc::new(
Expand All @@ -273,7 +288,7 @@ pub mod tests {
.level0_sub_level_compact_level_count(1)
.build(),
);
LevelCompactionPicker::new(1, config)
LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()))
}

#[test]
Expand Down Expand Up @@ -373,7 +388,8 @@ pub mod tests {
.level0_sub_level_compact_level_count(1)
.build(),
);
let mut picker = LevelCompactionPicker::new(1, config);
let mut picker =
LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));

let levels = vec![Level {
level_idx: 1,
Expand Down Expand Up @@ -497,7 +513,11 @@ pub mod tests {
.max_bytes_for_level_multiplier(1)
.level0_sub_level_compact_level_count(2)
.build();
let mut picker = LevelCompactionPicker::new(1, Arc::new(config));
let mut picker = LevelCompactionPicker::new(
1,
Arc::new(config),
Arc::new(CompactionDeveloperConfig::default()),
);

let mut levels = Levels {
levels: vec![Level {
Expand Down Expand Up @@ -571,7 +591,8 @@ pub mod tests {
);
// Only include sub-level 0 results will violate MAX_WRITE_AMPLIFICATION.
// So all sub-levels are included to make write amplification < MAX_WRITE_AMPLIFICATION.
let mut picker = LevelCompactionPicker::new(1, config);
let mut picker =
LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
Expand All @@ -597,7 +618,8 @@ pub mod tests {
.level0_sub_level_compact_level_count(1)
.build(),
);
let mut picker = LevelCompactionPicker::new(1, config);
let mut picker =
LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
Expand Down Expand Up @@ -663,7 +685,11 @@ pub mod tests {

// Only include sub-level 0 results will violate MAX_WRITE_AMPLIFICATION.
// But stopped by pending sub-level when trying to include more sub-levels.
let mut picker = LevelCompactionPicker::new(1, config.clone());
let mut picker = LevelCompactionPicker::new(
1,
config.clone(),
Arc::new(CompactionDeveloperConfig::default()),
);
let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
assert!(ret.is_none());

Expand All @@ -673,7 +699,8 @@ pub mod tests {
}

// No more pending sub-level so we can get a task now.
let mut picker = LevelCompactionPicker::new(1, config);
let mut picker =
LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
Expand Down Expand Up @@ -706,7 +733,8 @@ pub mod tests {
.build(),
);

let mut picker = LevelCompactionPicker::new(1, config);
let mut picker =
LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,25 @@ use super::{
CompactionInput, CompactionPicker, CompactionTaskValidator, LevelCompactionPicker,
LocalPickerStatistic, TierCompactionPicker,
};
use crate::hummock::compaction::CompactionDeveloperConfig;
use crate::hummock::level_handler::LevelHandler;

pub struct EmergencyCompactionPicker {
target_level: usize,
config: Arc<CompactionConfig>,
developer_config: Arc<CompactionDeveloperConfig>,
}

impl EmergencyCompactionPicker {
pub fn new(target_level: usize, config: Arc<CompactionConfig>) -> Self {
pub fn new(
target_level: usize,
config: Arc<CompactionConfig>,
developer_config: Arc<CompactionDeveloperConfig>,
) -> Self {
Self {
target_level,
config,
developer_config,
}
}

Expand All @@ -48,6 +55,7 @@ impl EmergencyCompactionPicker {
self.target_level,
self.config.clone(),
unused_validator.clone(),
self.developer_config.clone(),
);

if let Some(ret) =
Expand Down
Loading
Loading