Skip to content

Commit

Permalink
feat(compaction): add vnode watermark picker (#17372)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Jul 23, 2024
1 parent c5607a3 commit 285afdb
Show file tree
Hide file tree
Showing 23 changed files with 1,035 additions and 372 deletions.
1 change: 1 addition & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ message CompactTask {
TTL = 5;
TOMBSTONE = 6;
EMERGENCY = 7;
VNODE_WATERMARK = 8;
}

// Identifies whether the task is space_reclaim, if the compact_task_type increases, it will be refactored to enum
Expand Down
47 changes: 30 additions & 17 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use picker::{LevelCompactionPicker, TierCompactionPicker};
use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
use risingwave_pb::hummock::compaction_config::CompactionMode;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactTask, CompactionConfig, LevelType};
pub use selector::CompactionSelector;
pub use selector::{CompactionSelector, CompactionSelectorContext};

use self::selector::{EmergencySelector, LocalSelectorStatistic};
use super::check_cg_write_limit;
Expand Down Expand Up @@ -89,6 +91,7 @@ impl CompactStatus {
}
}

#[allow(clippy::too_many_arguments)]
pub fn get_compact_task(
&mut self,
levels: &Levels,
Expand All @@ -97,38 +100,44 @@ impl CompactStatus {
group: &CompactionGroup,
stats: &mut LocalSelectorStatistic,
selector: &mut Box<dyn CompactionSelector>,
table_id_to_options: HashMap<u32, TableOption>,
table_id_to_options: &HashMap<u32, TableOption>,
developer_config: Arc<CompactionDeveloperConfig>,
table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
state_table_info: &HummockVersionStateTableInfo,
) -> Option<CompactionTask> {
// When we compact the files, we must make the result of compaction meet the following
// conditions, for any user key, the epoch of it in the file existing in the lower
// layer must be larger.
if let Some(task) = selector.pick_compaction(
task_id,
let selector_context = CompactionSelectorContext {
group,
levels,
member_table_ids,
&mut self.level_handlers,
stats,
table_id_to_options.clone(),
developer_config.clone(),
) {
level_handlers: &mut self.level_handlers,
selector_stats: stats,
table_id_to_options,
developer_config: developer_config.clone(),
table_watermarks,
state_table_info,
};
// When we compact the files, we must make the result of compaction meet the following
// conditions, for any user key, the epoch of it in the file existing in the lower
// layer must be larger.
if let Some(task) = selector.pick_compaction(task_id, selector_context) {
return Some(task);
} else {
let compaction_group_config = &group.compaction_config;
if check_cg_write_limit(levels, compaction_group_config.as_ref()).is_write_stop()
&& compaction_group_config.enable_emergency_picker
{
return EmergencySelector::default().pick_compaction(
task_id,
let selector_context = CompactionSelectorContext {
group,
levels,
member_table_ids,
&mut self.level_handlers,
stats,
level_handlers: &mut self.level_handlers,
selector_stats: stats,
table_id_to_options,
developer_config,
);
table_watermarks,
state_table_info,
};
return EmergencySelector::default().pick_compaction(task_id, selector_context);
}
}

Expand Down Expand Up @@ -163,6 +172,10 @@ impl CompactStatus {
}

pub fn is_trivial_reclaim(task: &CompactTask) -> bool {
// Currently all VnodeWatermark tasks are trivial reclaim.
if task.task_type() == TaskType::VnodeWatermark {
return true;
}
let exist_table_ids = HashSet::<u32>::from_iter(task.existing_table_ids.clone());
task.input_ssts.iter().all(|level| {
level.table_infos.iter().all(|sst| {
Expand Down
75 changes: 46 additions & 29 deletions src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ impl CompactionPicker for ManualCompactionPicker {
pub mod tests {
use std::collections::{BTreeSet, HashMap};

use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
use risingwave_pb::hummock::compact_task;
pub use risingwave_pb::hummock::KeyRange;

Expand All @@ -341,7 +342,7 @@ pub mod tests {
use crate::hummock::compaction::selector::{CompactionSelector, ManualCompactionSelector};
use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic};
use crate::hummock::model::CompactionGroup;
use crate::hummock::test_utils::iterator_test_key_of_epoch;
use crate::hummock::test_utils::{compaction_selector_context, iterator_test_key_of_epoch};

fn clean_task_state(level_handler: &mut LevelHandler) {
for pending_task_id in &level_handler.pending_tasks_ids() {
Expand Down Expand Up @@ -1196,13 +1197,17 @@ pub mod tests {
let task = selector
.pick_compaction(
1,
&group_config,
&levels,
&BTreeSet::new(),
&mut levels_handler,
&mut local_stats,
HashMap::default(),
Arc::new(CompactionDeveloperConfig::default()),
compaction_selector_context(
&group_config,
&levels,
&BTreeSet::new(),
&mut levels_handler,
&mut local_stats,
&HashMap::default(),
Arc::new(CompactionDeveloperConfig::default()),
&Default::default(),
&HummockVersionStateTableInfo::empty(),
),
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
Expand Down Expand Up @@ -1234,13 +1239,17 @@ pub mod tests {
let task = selector
.pick_compaction(
2,
&group_config,
&levels,
&BTreeSet::new(),
&mut levels_handler,
&mut local_stats,
HashMap::default(),
Arc::new(CompactionDeveloperConfig::default()),
compaction_selector_context(
&group_config,
&levels,
&BTreeSet::new(),
&mut levels_handler,
&mut local_stats,
&HashMap::default(),
Arc::new(CompactionDeveloperConfig::default()),
&Default::default(),
&HummockVersionStateTableInfo::empty(),
),
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
Expand Down Expand Up @@ -1308,13 +1317,17 @@ pub mod tests {
let task = selector
.pick_compaction(
1,
&group_config,
&levels,
&BTreeSet::new(),
&mut levels_handler,
&mut local_stats,
HashMap::default(),
Arc::new(CompactionDeveloperConfig::default()),
compaction_selector_context(
&group_config,
&levels,
&BTreeSet::new(),
&mut levels_handler,
&mut local_stats,
&HashMap::default(),
Arc::new(CompactionDeveloperConfig::default()),
&Default::default(),
&HummockVersionStateTableInfo::empty(),
),
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
Expand Down Expand Up @@ -1348,13 +1361,17 @@ pub mod tests {
let task = selector
.pick_compaction(
1,
&group_config,
&levels,
&BTreeSet::new(),
&mut levels_handler,
&mut local_stats,
HashMap::default(),
Arc::new(CompactionDeveloperConfig::default()),
compaction_selector_context(
&group_config,
&levels,
&BTreeSet::new(),
&mut levels_handler,
&mut local_stats,
&HashMap::default(),
Arc::new(CompactionDeveloperConfig::default()),
&Default::default(),
&HummockVersionStateTableInfo::empty(),
),
)
.unwrap();
assert_compaction_task(&task, &levels_handler);
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/hummock/compaction/picker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod trivial_move_compaction_picker;
mod ttl_reclaim_compaction_picker;

mod compaction_task_validator;
mod vnode_watermark_picker;

pub use base_level_compaction_picker::LevelCompactionPicker;
pub use compaction_task_validator::{CompactionTaskValidator, ValidationRuleType};
Expand All @@ -40,6 +41,7 @@ pub use tombstone_reclaim_compaction_picker::{
};
pub use trivial_move_compaction_picker::TrivialMovePicker;
pub use ttl_reclaim_compaction_picker::{TtlPickerState, TtlReclaimCompactionPicker};
pub use vnode_watermark_picker::VnodeWatermarkCompactionPicker;

use crate::hummock::level_handler::LevelHandler;

Expand Down
Loading

0 comments on commit 285afdb

Please sign in to comment.