From 285afdb6cd1be9a00dde9d486ce2fab3cdfcb4b2 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Tue, 23 Jul 2024 17:28:02 +0800 Subject: [PATCH] feat(compaction): add vnode watermark picker (#17372) --- proto/hummock.proto | 1 + src/meta/src/hummock/compaction/mod.rs | 47 ++-- .../picker/manual_compaction_picker.rs | 75 ++++--- src/meta/src/hummock/compaction/picker/mod.rs | 2 + .../picker/space_reclaim_compaction_picker.rs | 146 +++++++----- .../picker/ttl_reclaim_compaction_picker.rs | 130 ++++++----- .../picker/vnode_watermark_picker.rs | 211 ++++++++++++++++++ .../compaction/selector/emergency_selector.rs | 30 +-- .../compaction/selector/level_selector.rs | 96 ++++---- .../compaction/selector/manual_selector.rs | 29 +-- .../src/hummock/compaction/selector/mod.rs | 24 +- .../selector/space_reclaim_selector.rs | 28 +-- .../selector/tombstone_compaction_selector.rs | 26 +-- .../compaction/selector/ttl_selector.rs | 26 +-- .../selector/vnode_watermark_selector.rs | 87 ++++++++ src/meta/src/hummock/manager/compaction.rs | 12 +- src/meta/src/hummock/manager/timer_task.rs | 7 + src/meta/src/hummock/test_utils.rs | 36 ++- .../compaction_group/hummock_version_ext.rs | 153 +++++++++---- .../src/hummock/iterator/skip_watermark.rs | 43 +--- src/tests/simulation/src/ctl_ext.rs | 45 ++++ .../tests/integration_tests/compaction/mod.rs | 152 +++++++++++++ .../tests/integration_tests/main.rs | 1 + 23 files changed, 1035 insertions(+), 372 deletions(-) create mode 100644 src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs create mode 100644 src/meta/src/hummock/compaction/selector/vnode_watermark_selector.rs create mode 100644 src/tests/simulation/tests/integration_tests/compaction/mod.rs diff --git a/proto/hummock.proto b/proto/hummock.proto index 412f552fec5c8..5d66a2b7bb79b 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -384,6 +384,7 @@ message CompactTask { TTL = 5; TOMBSTONE = 6; EMERGENCY = 7; + VNODE_WATERMARK = 8; } // Identifies whether the task is space_reclaim, if the compact_task_type increases, it will be refactored to enum diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index b2a3860117024..f91f31b178b2b 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -26,11 +26,13 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use picker::{LevelCompactionPicker, TierCompactionPicker}; +use risingwave_hummock_sdk::table_watermark::TableWatermarks; +use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId}; use risingwave_pb::hummock::compaction_config::CompactionMode; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{CompactTask, CompactionConfig, LevelType}; -pub use selector::CompactionSelector; +pub use selector::{CompactionSelector, CompactionSelectorContext}; use self::selector::{EmergencySelector, LocalSelectorStatistic}; use super::check_cg_write_limit; @@ -89,6 +91,7 @@ impl CompactStatus { } } + #[allow(clippy::too_many_arguments)] pub fn get_compact_task( &mut self, levels: &Levels, @@ -97,38 +100,44 @@ impl CompactStatus { group: &CompactionGroup, stats: &mut LocalSelectorStatistic, selector: &mut Box, - table_id_to_options: HashMap, + table_id_to_options: &HashMap, developer_config: Arc, + table_watermarks: &HashMap>, + state_table_info: &HummockVersionStateTableInfo, ) -> Option { - // When we compact the files, we must make the result of compaction meet the following - // conditions, for any user key, the epoch of it in the file existing in the lower - // layer must be larger. - if let Some(task) = selector.pick_compaction( - task_id, + let selector_context = CompactionSelectorContext { group, levels, member_table_ids, - &mut self.level_handlers, - stats, - table_id_to_options.clone(), - developer_config.clone(), - ) { + level_handlers: &mut self.level_handlers, + selector_stats: stats, + table_id_to_options, + developer_config: developer_config.clone(), + table_watermarks, + state_table_info, + }; + // When we compact the files, we must make the result of compaction meet the following + // conditions, for any user key, the epoch of it in the file existing in the lower + // layer must be larger. + if let Some(task) = selector.pick_compaction(task_id, selector_context) { return Some(task); } else { let compaction_group_config = &group.compaction_config; if check_cg_write_limit(levels, compaction_group_config.as_ref()).is_write_stop() && compaction_group_config.enable_emergency_picker { - return EmergencySelector::default().pick_compaction( - task_id, + let selector_context = CompactionSelectorContext { group, levels, member_table_ids, - &mut self.level_handlers, - stats, + level_handlers: &mut self.level_handlers, + selector_stats: stats, table_id_to_options, developer_config, - ); + table_watermarks, + state_table_info, + }; + return EmergencySelector::default().pick_compaction(task_id, selector_context); } } @@ -163,6 +172,10 @@ impl CompactStatus { } pub fn is_trivial_reclaim(task: &CompactTask) -> bool { + // Currently all VnodeWatermark tasks are trivial reclaim. + if task.task_type() == TaskType::VnodeWatermark { + return true; + } let exist_table_ids = HashSet::::from_iter(task.existing_table_ids.clone()); task.input_ssts.iter().all(|level| { level.table_infos.iter().all(|sst| { diff --git a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs index 2d1b2c95b20aa..7919918ee73fd 100644 --- a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs @@ -328,6 +328,7 @@ impl CompactionPicker for ManualCompactionPicker { pub mod tests { use std::collections::{BTreeSet, HashMap}; + use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; use risingwave_pb::hummock::compact_task; pub use risingwave_pb::hummock::KeyRange; @@ -341,7 +342,7 @@ pub mod tests { use crate::hummock::compaction::selector::{CompactionSelector, ManualCompactionSelector}; use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic}; use crate::hummock::model::CompactionGroup; - use crate::hummock::test_utils::iterator_test_key_of_epoch; + use crate::hummock::test_utils::{compaction_selector_context, iterator_test_key_of_epoch}; fn clean_task_state(level_handler: &mut LevelHandler) { for pending_task_id in &level_handler.pending_tasks_ids() { @@ -1196,13 +1197,17 @@ pub mod tests { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handler, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handler, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -1234,13 +1239,17 @@ pub mod tests { let task = selector .pick_compaction( 2, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handler, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handler, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -1308,13 +1317,17 @@ pub mod tests { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handler, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handler, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -1348,13 +1361,17 @@ pub mod tests { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handler, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handler, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_compaction_task(&task, &levels_handler); diff --git a/src/meta/src/hummock/compaction/picker/mod.rs b/src/meta/src/hummock/compaction/picker/mod.rs index 6d464b9a33bcd..05289cf10403e 100644 --- a/src/meta/src/hummock/compaction/picker/mod.rs +++ b/src/meta/src/hummock/compaction/picker/mod.rs @@ -24,6 +24,7 @@ mod trivial_move_compaction_picker; mod ttl_reclaim_compaction_picker; mod compaction_task_validator; +mod vnode_watermark_picker; pub use base_level_compaction_picker::LevelCompactionPicker; pub use compaction_task_validator::{CompactionTaskValidator, ValidationRuleType}; @@ -40,6 +41,7 @@ pub use tombstone_reclaim_compaction_picker::{ }; pub use trivial_move_compaction_picker::TrivialMovePicker; pub use ttl_reclaim_compaction_picker::{TtlPickerState, TtlReclaimCompactionPicker}; +pub use vnode_watermark_picker::VnodeWatermarkCompactionPicker; use crate::hummock::level_handler::LevelHandler; diff --git a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs index b94f7587dd04e..6d3b1717bf8b4 100644 --- a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs @@ -173,6 +173,7 @@ mod test { use itertools::Itertools; use risingwave_common::catalog::TableId; + use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; use risingwave_pb::hummock::compact_task; pub use risingwave_pb::hummock::{Level, LevelType}; @@ -187,6 +188,7 @@ mod test { }; use crate::hummock::compaction::CompactionDeveloperConfig; use crate::hummock::model::CompactionGroup; + use crate::hummock::test_utils::compaction_selector_context; #[test] fn test_space_reclaim_compaction_selector() { @@ -252,13 +254,17 @@ mod test { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &member_table_ids, - &mut levels_handler, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &member_table_ids, + &mut levels_handler, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -269,13 +275,17 @@ mod test { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &member_table_ids, - &mut levels_handler, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &member_table_ids, + &mut levels_handler, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_eq!(task.input.input_levels.len(), 2); @@ -310,13 +320,17 @@ mod test { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &member_table_ids, - &mut levels_handler, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &member_table_ids, + &mut levels_handler, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -337,13 +351,17 @@ mod test { assert!(selector .pick_compaction( 1, - &group_config, - &levels, - &member_table_ids, - &mut levels_handler, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &member_table_ids, + &mut levels_handler, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .is_none()) } @@ -365,13 +383,17 @@ mod test { // pick space reclaim let task = selector.pick_compaction( 1, - &group_config, - &levels, - &member_table_ids, - &mut levels_handler, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &member_table_ids, + &mut levels_handler, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ); assert!(task.is_none()); } @@ -389,13 +411,17 @@ mod test { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &member_table_ids, - &mut levels_handler, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &member_table_ids, + &mut levels_handler, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -428,13 +454,17 @@ mod test { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &member_table_ids, - &mut levels_handler, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &member_table_ids, + &mut levels_handler, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); @@ -485,13 +515,17 @@ mod test { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &member_table_ids, - &mut levels_handler, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &member_table_ids, + &mut levels_handler, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); diff --git a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs index bc1fc2ce304be..9efbfcd02076a 100644 --- a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs @@ -64,7 +64,7 @@ pub struct TtlReclaimCompactionPicker { } impl TtlReclaimCompactionPicker { - pub fn new(table_id_to_options: HashMap) -> Self { + pub fn new(table_id_to_options: &HashMap) -> Self { let table_id_to_ttl: HashMap = table_id_to_options .iter() .filter(|id_to_option| { @@ -205,6 +205,7 @@ mod test { use std::sync::Arc; use itertools::Itertools; + use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; use risingwave_pb::hummock::compact_task; pub use risingwave_pb::hummock::{Level, LevelType}; @@ -217,6 +218,7 @@ mod test { use crate::hummock::compaction::selector::{CompactionSelector, TtlCompactionSelector}; use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic}; use crate::hummock::model::CompactionGroup; + use crate::hummock::test_utils::compaction_selector_context; #[test] fn test_ttl_reclaim_compaction_selector() { @@ -376,13 +378,17 @@ mod test { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handler, - &mut local_stats, - table_id_to_options, - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handler, + &mut local_stats, + &table_id_to_options, + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -427,13 +433,17 @@ mod test { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handler, - &mut local_stats, - table_id_to_options.clone(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handler, + &mut local_stats, + &table_id_to_options, + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -461,13 +471,17 @@ mod test { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handler, - &mut local_stats, - table_id_to_options.clone(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handler, + &mut local_stats, + &table_id_to_options, + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -518,13 +532,17 @@ mod test { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handler, - &mut local_stats, - table_id_to_options, - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handler, + &mut local_stats, + &table_id_to_options, + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_compaction_task(&task, &levels_handler); @@ -560,13 +578,17 @@ mod test { // // pick ttl reclaim let task = selector.pick_compaction( 1, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handler, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handler, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ); // empty table_options does not select any files @@ -623,13 +645,17 @@ mod test { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handler, - &mut local_stats, - table_id_to_options.clone(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handler, + &mut local_stats, + &table_id_to_options, + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); @@ -716,13 +742,17 @@ mod test { let task = selector .pick_compaction( 1, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handler, - &mut local_stats, - table_id_to_options.clone(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handler, + &mut local_stats, + &table_id_to_options, + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); diff --git a/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs b/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs new file mode 100644 index 0000000000000..f0344d63b6632 --- /dev/null +++ b/src/meta/src/hummock/compaction/picker/vnode_watermark_picker.rs @@ -0,0 +1,211 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use risingwave_common::catalog::TableId; +use risingwave_hummock_sdk::key::{FullKey, TableKey}; +use risingwave_hummock_sdk::table_watermark::ReadTableWatermark; +use risingwave_pb::hummock::hummock_version::Levels; +use risingwave_pb::hummock::{InputLevel, SstableInfo}; + +use crate::hummock::compaction::picker::CompactionInput; +use crate::hummock::level_handler::LevelHandler; +pub struct VnodeWatermarkCompactionPicker {} + +impl VnodeWatermarkCompactionPicker { + pub fn new() -> Self { + Self {} + } + + /// The current implementation only picks trivial reclaim task for the bottommost level. + /// Must modify [`crate::hummock::compaction::CompactStatus::is_trivial_reclaim`], if nontrivial reclaim is supported in the future. + pub fn pick_compaction( + &mut self, + levels: &Levels, + level_handlers: &[LevelHandler], + table_watermarks: &BTreeMap, + ) -> Option { + let level = levels.levels.last()?; + let mut select_input_ssts = vec![]; + for sst_info in &level.table_infos { + if !level_handlers[level.level_idx as usize].is_pending_compact(&sst_info.sst_id) + && should_delete_sst_by_watermark(sst_info, table_watermarks) + { + select_input_ssts.push(sst_info.clone()); + } + } + if select_input_ssts.is_empty() { + return None; + } + Some(CompactionInput { + select_input_size: select_input_ssts.iter().map(|sst| sst.file_size).sum(), + total_file_count: select_input_ssts.len() as u64, + input_levels: vec![ + InputLevel { + level_idx: level.level_idx, + level_type: level.level_type, + table_infos: select_input_ssts, + }, + InputLevel { + level_idx: level.level_idx, + level_type: level.level_type, + table_infos: vec![], + }, + ], + target_level: level.level_idx as usize, + target_sub_level_id: level.sub_level_id, + ..Default::default() + }) + } +} + +fn should_delete_sst_by_watermark( + sst_info: &SstableInfo, + table_watermarks: &BTreeMap, +) -> bool { + // Both table id and vnode must be identical for both the left and right keys in a SST. + // As more data is written to the bottommost level, they will eventually become identical. + let left_key = FullKey::decode(&sst_info.key_range.as_ref().unwrap().left); + let right_key = FullKey::decode(&sst_info.key_range.as_ref().unwrap().right); + if left_key.user_key.table_id != right_key.user_key.table_id { + return false; + } + if left_key.user_key.table_key.vnode_part() != right_key.user_key.table_key.vnode_part() { + return false; + } + let Some(watermarks) = table_watermarks.get(&left_key.user_key.table_id) else { + return false; + }; + should_delete_key_by_watermark(&left_key.user_key.table_key, watermarks) + && should_delete_key_by_watermark(&right_key.user_key.table_key, watermarks) +} + +fn should_delete_key_by_watermark( + table_key: &TableKey<&[u8]>, + watermark: &ReadTableWatermark, +) -> bool { + let (vnode, key) = table_key.split_vnode(); + let Some(w) = watermark.vnode_watermarks.get(&vnode) else { + return false; + }; + watermark.direction.filter_by_watermark(key, w) +} + +#[cfg(test)] +mod tests { + use bytes::{BufMut, Bytes, BytesMut}; + use risingwave_common::hash::VirtualNode; + use risingwave_hummock_sdk::key::{FullKey, TableKey}; + use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection}; + use risingwave_pb::hummock::{KeyRange, SstableInfo}; + + use crate::hummock::compaction::picker::vnode_watermark_picker::should_delete_sst_by_watermark; + + #[test] + fn test_should_delete_sst_by_watermark() { + let table_watermarks = maplit::btreemap! { + 1.into() => ReadTableWatermark { + direction: WatermarkDirection::Ascending, + vnode_watermarks: maplit::btreemap! { + VirtualNode::from_index(16) => "some_watermark_key_8".into(), + VirtualNode::from_index(17) => "some_watermark_key_8".into(), + }, + }, + }; + let table_key = |vnode_part: usize, key_part: &str| { + let mut builder = BytesMut::new(); + builder.put_slice(&VirtualNode::from_index(vnode_part).to_be_bytes()); + builder.put_slice(&Bytes::copy_from_slice(key_part.as_bytes())); + TableKey(builder.freeze()) + }; + + let sst_info = SstableInfo { + object_id: 1, + sst_id: 1, + key_range: Some(KeyRange { + left: FullKey::new(2.into(), table_key(16, "some_watermark_key_1"), 0).encode(), + right: FullKey::new(2.into(), table_key(16, "some_watermark_key_2"), 0).encode(), + right_exclusive: true, + }), + table_ids: vec![2], + ..Default::default() + }; + assert!( + !should_delete_sst_by_watermark(&sst_info, &table_watermarks), + "should fail because no matching watermark found" + ); + + let sst_info = SstableInfo { + object_id: 1, + sst_id: 1, + key_range: Some(KeyRange { + left: FullKey::new(1.into(), table_key(13, "some_watermark_key_1"), 0).encode(), + right: FullKey::new(1.into(), table_key(14, "some_watermark_key_2"), 0).encode(), + right_exclusive: true, + }), + table_ids: vec![1], + ..Default::default() + }; + assert!( + !should_delete_sst_by_watermark(&sst_info, &table_watermarks), + "should fail because no matching vnode found" + ); + + let sst_info = SstableInfo { + object_id: 1, + sst_id: 1, + key_range: Some(KeyRange { + left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0).encode(), + right: FullKey::new(1.into(), table_key(17, "some_watermark_key_2"), 0).encode(), + right_exclusive: true, + }), + table_ids: vec![1], + ..Default::default() + }; + assert!( + !should_delete_sst_by_watermark(&sst_info, &table_watermarks), + "should fail because different vnodes found" + ); + + let sst_info = SstableInfo { + object_id: 1, + sst_id: 1, + key_range: Some(KeyRange { + left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0).encode(), + right: FullKey::new(1.into(), table_key(16, "some_watermark_key_9"), 0).encode(), + right_exclusive: true, + }), + table_ids: vec![1], + ..Default::default() + }; + assert!( + !should_delete_sst_by_watermark(&sst_info, &table_watermarks), + "should fail because right key is greater than watermark" + ); + + let sst_info = SstableInfo { + object_id: 1, + sst_id: 1, + key_range: Some(KeyRange { + left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0).encode(), + right: FullKey::new(1.into(), table_key(16, "some_watermark_key_2"), 0).encode(), + right_exclusive: true, + }), + table_ids: vec![1], + ..Default::default() + }; + assert!(should_delete_sst_by_watermark(&sst_info, &table_watermarks)); + } +} diff --git a/src/meta/src/hummock/compaction/selector/emergency_selector.rs b/src/meta/src/hummock/compaction/selector/emergency_selector.rs index 685ab1487d51c..b9df77c430573 100644 --- a/src/meta/src/hummock/compaction/selector/emergency_selector.rs +++ b/src/meta/src/hummock/compaction/selector/emergency_selector.rs @@ -12,21 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::sync::Arc; - -use risingwave_common::catalog::TableOption; use risingwave_hummock_sdk::HummockCompactionTaskId; use risingwave_pb::hummock::compact_task; -use risingwave_pb::hummock::hummock_version::Levels; -use super::{CompactionSelector, DynamicLevelSelectorCore, LocalSelectorStatistic}; +use super::{CompactionSelector, DynamicLevelSelectorCore}; use crate::hummock::compaction::picker::{EmergencyCompactionPicker, LocalPickerStatistic}; -use crate::hummock::compaction::{ - create_compaction_task, CompactionDeveloperConfig, CompactionTask, -}; -use crate::hummock::level_handler::LevelHandler; -use crate::hummock::model::CompactionGroup; +use crate::hummock::compaction::selector::CompactionSelectorContext; +use crate::hummock::compaction::{create_compaction_task, CompactionTask}; #[derive(Default)] pub struct EmergencySelector {} @@ -35,14 +27,16 @@ impl CompactionSelector for EmergencySelector { fn pick_compaction( &mut self, task_id: HummockCompactionTaskId, - group: &CompactionGroup, - levels: &Levels, - _member_table_ids: &std::collections::BTreeSet, - level_handlers: &mut [LevelHandler], - selector_stats: &mut LocalSelectorStatistic, - _table_id_to_options: HashMap, - developer_config: Arc, + context: CompactionSelectorContext<'_>, ) -> Option { + let CompactionSelectorContext { + group, + levels, + level_handlers, + selector_stats, + developer_config, + .. + } = context; let dynamic_level_core = DynamicLevelSelectorCore::new( group.compaction_config.clone(), developer_config.clone(), diff --git a/src/meta/src/hummock/compaction/selector/level_selector.rs b/src/meta/src/hummock/compaction/selector/level_selector.rs index 38d1e35e22502..b48455b49d4c8 100644 --- a/src/meta/src/hummock/compaction/selector/level_selector.rs +++ b/src/meta/src/hummock/compaction/selector/level_selector.rs @@ -16,10 +16,8 @@ // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -use std::collections::HashMap; use std::sync::Arc; -use risingwave_common::catalog::TableOption; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt; use risingwave_hummock_sdk::HummockCompactionTaskId; use risingwave_pb::hummock::hummock_version::Levels; @@ -33,11 +31,11 @@ use crate::hummock::compaction::picker::{ CompactionPicker, CompactionTaskValidator, IntraCompactionPicker, LocalPickerStatistic, MinOverlappingPicker, }; +use crate::hummock::compaction::selector::CompactionSelectorContext; use crate::hummock::compaction::{ - create_overlap_strategy, CompactionDeveloperConfig, CompactionTask, LocalSelectorStatistic, + create_overlap_strategy, CompactionDeveloperConfig, CompactionTask, }; use crate::hummock::level_handler::LevelHandler; -use crate::hummock::model::CompactionGroup; pub const SCORE_BASE: u64 = 100; @@ -420,14 +418,16 @@ impl CompactionSelector for DynamicLevelSelector { fn pick_compaction( &mut self, task_id: HummockCompactionTaskId, - compaction_group: &CompactionGroup, - levels: &Levels, - _member_table_ids: &std::collections::BTreeSet, - level_handlers: &mut [LevelHandler], - selector_stats: &mut LocalSelectorStatistic, - _table_id_to_options: HashMap, - developer_config: Arc, + context: CompactionSelectorContext<'_>, ) -> Option { + let CompactionSelectorContext { + group: compaction_group, + levels, + level_handlers, + selector_stats, + developer_config, + .. + } = context; let dynamic_level_core = DynamicLevelSelectorCore::new( compaction_group.compaction_config.clone(), developer_config, @@ -484,6 +484,7 @@ pub mod tests { use itertools::Itertools; use risingwave_common::constants::hummock::CompactionFilterFlag; + use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; use risingwave_pb::hummock::compaction_config::CompactionMode; use risingwave_pb::hummock::hummock_version::Levels; @@ -498,6 +499,7 @@ pub mod tests { use crate::hummock::compaction::CompactionDeveloperConfig; use crate::hummock::level_handler::LevelHandler; use crate::hummock::model::CompactionGroup; + use crate::hummock::test_utils::compaction_selector_context; #[test] fn test_dynamic_level() { @@ -610,13 +612,17 @@ pub mod tests { let compaction = selector .pick_compaction( 1, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handlers, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handlers, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_compaction_task(&compaction, &levels_handlers); @@ -638,13 +644,17 @@ pub mod tests { let compaction = selector .pick_compaction( 1, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handlers, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handlers, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_compaction_task(&compaction, &levels_handlers); @@ -658,13 +668,17 @@ pub mod tests { let compaction = selector .pick_compaction( 2, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handlers, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handlers, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ) .unwrap(); assert_compaction_task(&compaction, &levels_handlers); @@ -695,13 +709,17 @@ pub mod tests { // to score. let compaction = selector.pick_compaction( 2, - &group_config, - &levels, - &BTreeSet::new(), - &mut levels_handlers, - &mut local_stats, - HashMap::default(), - Arc::new(CompactionDeveloperConfig::default()), + compaction_selector_context( + &group_config, + &levels, + &BTreeSet::new(), + &mut levels_handlers, + &mut local_stats, + &HashMap::default(), + Arc::new(CompactionDeveloperConfig::default()), + &Default::default(), + &HummockVersionStateTableInfo::empty(), + ), ); assert!(compaction.is_none()); } diff --git a/src/meta/src/hummock/compaction/selector/manual_selector.rs b/src/meta/src/hummock/compaction/selector/manual_selector.rs index 62c94c8f888df..8c814a69b03dd 100644 --- a/src/meta/src/hummock/compaction/selector/manual_selector.rs +++ b/src/meta/src/hummock/compaction/selector/manual_selector.rs @@ -17,24 +17,18 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; +use std::collections::HashSet; -use risingwave_common::catalog::TableOption; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockSstableId}; -use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{compact_task, KeyRange}; -use super::{CompactionSelector, DynamicLevelSelectorCore, LocalSelectorStatistic}; +use super::{CompactionSelector, DynamicLevelSelectorCore}; use crate::hummock::compaction::picker::{ CompactionPicker, LocalPickerStatistic, ManualCompactionPicker, }; -use crate::hummock::compaction::{ - create_compaction_task, create_overlap_strategy, CompactionDeveloperConfig, CompactionTask, -}; -use crate::hummock::level_handler::LevelHandler; -use crate::hummock::model::CompactionGroup; +use crate::hummock::compaction::selector::CompactionSelectorContext; +use crate::hummock::compaction::{create_compaction_task, create_overlap_strategy, CompactionTask}; #[derive(Clone, Debug, PartialEq)] pub struct ManualCompactionOption { @@ -77,14 +71,15 @@ impl CompactionSelector for ManualCompactionSelector { fn pick_compaction( &mut self, task_id: HummockCompactionTaskId, - group: &CompactionGroup, - levels: &Levels, - _member_table_ids: &std::collections::BTreeSet, - level_handlers: &mut [LevelHandler], - _selector_stats: &mut LocalSelectorStatistic, - _table_id_to_options: HashMap, - developer_config: Arc, + context: CompactionSelectorContext<'_>, ) -> Option { + let CompactionSelectorContext { + group, + levels, + level_handlers, + developer_config, + .. + } = context; let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config); let overlap_strategy = create_overlap_strategy(group.compaction_config.compaction_mode()); diff --git a/src/meta/src/hummock/compaction/selector/mod.rs b/src/meta/src/hummock/compaction/selector/mod.rs index aca2457da62dc..b685cf381267e 100644 --- a/src/meta/src/hummock/compaction/selector/mod.rs +++ b/src/meta/src/hummock/compaction/selector/mod.rs @@ -23,6 +23,7 @@ mod manual_selector; mod space_reclaim_selector; mod tombstone_compaction_selector; mod ttl_selector; +mod vnode_watermark_selector; use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; @@ -31,12 +32,15 @@ pub use emergency_selector::EmergencySelector; pub use level_selector::{DynamicLevelSelector, DynamicLevelSelectorCore}; pub use manual_selector::{ManualCompactionOption, ManualCompactionSelector}; use risingwave_common::catalog::{TableId, TableOption}; +use risingwave_hummock_sdk::table_watermark::TableWatermarks; +use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; use risingwave_hummock_sdk::HummockCompactionTaskId; use risingwave_pb::hummock::compact_task; use risingwave_pb::hummock::hummock_version::Levels; pub use space_reclaim_selector::SpaceReclaimCompactionSelector; pub use tombstone_compaction_selector::TombstoneCompactionSelector; pub use ttl_selector::TtlCompactionSelector; +pub use vnode_watermark_selector::VnodeWatermarkCompactionSelector; use super::picker::LocalPickerStatistic; use super::{ @@ -47,17 +51,23 @@ use crate::hummock::level_handler::LevelHandler; use crate::hummock::model::CompactionGroup; use crate::rpc::metrics::MetaMetrics; +pub struct CompactionSelectorContext<'a> { + pub group: &'a CompactionGroup, + pub levels: &'a Levels, + pub member_table_ids: &'a BTreeSet, + pub level_handlers: &'a mut [LevelHandler], + pub selector_stats: &'a mut LocalSelectorStatistic, + pub table_id_to_options: &'a HashMap, + pub developer_config: Arc, + pub table_watermarks: &'a HashMap>, + pub state_table_info: &'a HummockVersionStateTableInfo, +} + pub trait CompactionSelector: Sync + Send { fn pick_compaction( &mut self, task_id: HummockCompactionTaskId, - group: &CompactionGroup, - levels: &Levels, - member_table_ids: &BTreeSet, - level_handlers: &mut [LevelHandler], - selector_stats: &mut LocalSelectorStatistic, - table_id_to_options: HashMap, - developer_config: Arc, + context: CompactionSelectorContext<'_>, ) -> Option; fn report_statistic_metrics(&self, _metrics: &MetaMetrics) {} diff --git a/src/meta/src/hummock/compaction/selector/space_reclaim_selector.rs b/src/meta/src/hummock/compaction/selector/space_reclaim_selector.rs index b284a6a538b3e..7f0eb0137c96b 100644 --- a/src/meta/src/hummock/compaction/selector/space_reclaim_selector.rs +++ b/src/meta/src/hummock/compaction/selector/space_reclaim_selector.rs @@ -17,21 +17,15 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -use std::collections::{BTreeSet, HashMap}; -use std::sync::Arc; +use std::collections::HashMap; -use risingwave_common::catalog::{TableId, TableOption}; use risingwave_hummock_sdk::HummockCompactionTaskId; use risingwave_pb::hummock::compact_task; -use risingwave_pb::hummock::hummock_version::Levels; use super::{CompactionSelector, DynamicLevelSelectorCore}; use crate::hummock::compaction::picker::{SpaceReclaimCompactionPicker, SpaceReclaimPickerState}; -use crate::hummock::compaction::{ - create_compaction_task, CompactionDeveloperConfig, CompactionTask, LocalSelectorStatistic, -}; -use crate::hummock::level_handler::LevelHandler; -use crate::hummock::model::CompactionGroup; +use crate::hummock::compaction::selector::CompactionSelectorContext; +use crate::hummock::compaction::{create_compaction_task, CompactionTask}; #[derive(Default)] pub struct SpaceReclaimCompactionSelector { @@ -42,14 +36,16 @@ impl CompactionSelector for SpaceReclaimCompactionSelector { fn pick_compaction( &mut self, task_id: HummockCompactionTaskId, - group: &CompactionGroup, - levels: &Levels, - member_table_ids: &BTreeSet, - level_handlers: &mut [LevelHandler], - _selector_stats: &mut LocalSelectorStatistic, - _table_id_to_options: HashMap, - developer_config: Arc, + context: CompactionSelectorContext<'_>, ) -> Option { + let CompactionSelectorContext { + group, + levels, + member_table_ids, + level_handlers, + developer_config, + .. + } = context; let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config); let mut picker = SpaceReclaimCompactionPicker::new( diff --git a/src/meta/src/hummock/compaction/selector/tombstone_compaction_selector.rs b/src/meta/src/hummock/compaction/selector/tombstone_compaction_selector.rs index b05802513733d..860bcd47c0a8d 100644 --- a/src/meta/src/hummock/compaction/selector/tombstone_compaction_selector.rs +++ b/src/meta/src/hummock/compaction/selector/tombstone_compaction_selector.rs @@ -13,23 +13,16 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; -use risingwave_common::catalog::TableOption; use risingwave_hummock_sdk::HummockCompactionTaskId; use risingwave_pb::hummock::compact_task; -use risingwave_pb::hummock::hummock_version::Levels; use super::{CompactionSelector, DynamicLevelSelectorCore}; use crate::hummock::compaction::picker::{ TombstoneReclaimCompactionPicker, TombstoneReclaimPickerState, }; -use crate::hummock::compaction::{ - create_compaction_task, create_overlap_strategy, CompactionDeveloperConfig, CompactionTask, - LocalSelectorStatistic, -}; -use crate::hummock::level_handler::LevelHandler; -use crate::hummock::model::CompactionGroup; +use crate::hummock::compaction::selector::CompactionSelectorContext; +use crate::hummock::compaction::{create_compaction_task, create_overlap_strategy, CompactionTask}; #[derive(Default)] pub struct TombstoneCompactionSelector { @@ -40,14 +33,15 @@ impl CompactionSelector for TombstoneCompactionSelector { fn pick_compaction( &mut self, task_id: HummockCompactionTaskId, - group: &CompactionGroup, - levels: &Levels, - _member_table_ids: &std::collections::BTreeSet, - level_handlers: &mut [LevelHandler], - _selector_stats: &mut LocalSelectorStatistic, - _table_id_to_options: HashMap, - developer_config: Arc, + context: CompactionSelectorContext<'_>, ) -> Option { + let CompactionSelectorContext { + group, + levels, + level_handlers, + developer_config, + .. + } = context; if group.compaction_config.tombstone_reclaim_ratio == 0 { // it might cause full-compaction when tombstone_reclaim_ratio == 0 return None; diff --git a/src/meta/src/hummock/compaction/selector/ttl_selector.rs b/src/meta/src/hummock/compaction/selector/ttl_selector.rs index 0e9497b06b17d..c20ce3304b777 100644 --- a/src/meta/src/hummock/compaction/selector/ttl_selector.rs +++ b/src/meta/src/hummock/compaction/selector/ttl_selector.rs @@ -18,20 +18,14 @@ // (found in the LICENSE.Apache file in the root directory). use std::collections::HashMap; -use std::sync::Arc; -use risingwave_common::catalog::TableOption; use risingwave_hummock_sdk::HummockCompactionTaskId; use risingwave_pb::hummock::compact_task; -use risingwave_pb::hummock::hummock_version::Levels; use super::{CompactionSelector, DynamicLevelSelectorCore}; use crate::hummock::compaction::picker::{TtlPickerState, TtlReclaimCompactionPicker}; -use crate::hummock::compaction::{ - create_compaction_task, CompactionDeveloperConfig, CompactionTask, LocalSelectorStatistic, -}; -use crate::hummock::level_handler::LevelHandler; -use crate::hummock::model::CompactionGroup; +use crate::hummock::compaction::selector::CompactionSelectorContext; +use crate::hummock::compaction::{create_compaction_task, CompactionTask}; #[derive(Default)] pub struct TtlCompactionSelector { @@ -42,14 +36,16 @@ impl CompactionSelector for TtlCompactionSelector { fn pick_compaction( &mut self, task_id: HummockCompactionTaskId, - group: &CompactionGroup, - levels: &Levels, - _member_table_ids: &std::collections::BTreeSet, - level_handlers: &mut [LevelHandler], - _selector_stats: &mut LocalSelectorStatistic, - table_id_to_options: HashMap, - developer_config: Arc, + context: CompactionSelectorContext<'_>, ) -> Option { + let CompactionSelectorContext { + group, + levels, + level_handlers, + table_id_to_options, + developer_config, + .. + } = context; let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config); let ctx = dynamic_level_core.calculate_level_base_size(levels); diff --git a/src/meta/src/hummock/compaction/selector/vnode_watermark_selector.rs b/src/meta/src/hummock/compaction/selector/vnode_watermark_selector.rs new file mode 100644 index 0000000000000..e09ed7e661581 --- /dev/null +++ b/src/meta/src/hummock/compaction/selector/vnode_watermark_selector.rs @@ -0,0 +1,87 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::sync::Arc; + +use risingwave_common::catalog::TableId; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ + safe_epoch_read_table_watermarks_impl, safe_epoch_table_watermarks_impl, +}; +use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, TableWatermarks}; +use risingwave_hummock_sdk::version::HummockVersionStateTableInfo; +use risingwave_hummock_sdk::HummockCompactionTaskId; +use risingwave_pb::hummock::compact_task::TaskType; + +use crate::hummock::compaction::picker::VnodeWatermarkCompactionPicker; +use crate::hummock::compaction::selector::{CompactionSelectorContext, DynamicLevelSelectorCore}; +use crate::hummock::compaction::{create_compaction_task, CompactionSelector, CompactionTask}; +#[derive(Default)] +pub struct VnodeWatermarkCompactionSelector {} + +impl CompactionSelector for VnodeWatermarkCompactionSelector { + fn pick_compaction( + &mut self, + task_id: HummockCompactionTaskId, + context: CompactionSelectorContext<'_>, + ) -> Option { + let CompactionSelectorContext { + group, + levels, + level_handlers, + developer_config, + table_watermarks, + member_table_ids, + state_table_info, + .. + } = context; + let dynamic_level_core = + DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config); + let ctx = dynamic_level_core.calculate_level_base_size(levels); + let mut picker = VnodeWatermarkCompactionPicker::new(); + let table_watermarks = + safe_epoch_read_table_watermarks(table_watermarks, state_table_info, member_table_ids); + let compaction_input = picker.pick_compaction(levels, level_handlers, &table_watermarks)?; + compaction_input.add_pending_task(task_id, level_handlers); + Some(create_compaction_task( + dynamic_level_core.get_config(), + compaction_input, + ctx.base_level, + self.task_type(), + )) + } + + fn name(&self) -> &'static str { + "VnodeWatermarkCompaction" + } + + fn task_type(&self) -> TaskType { + TaskType::VnodeWatermark + } +} + +fn safe_epoch_read_table_watermarks( + table_watermarks: &HashMap>, + state_table_info: &HummockVersionStateTableInfo, + member_table_ids: &BTreeSet, +) -> BTreeMap { + safe_epoch_read_table_watermarks_impl(&safe_epoch_table_watermarks_impl( + table_watermarks, + state_table_info, + &member_table_ids + .iter() + .map(TableId::table_id) + .collect::>(), + )) +} diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index e5e03e593b862..d8d2d709189cb 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -78,7 +78,7 @@ use crate::hummock::compaction::selector::level_selector::PickerInfo; use crate::hummock::compaction::selector::{ DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption, ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector, - TtlCompactionSelector, + TtlCompactionSelector, VnodeWatermarkCompactionSelector, }; use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector}; use crate::hummock::error::{Error, Result}; @@ -133,6 +133,10 @@ fn init_selectors() -> HashMap::default(), ); + compaction_selectors.insert( + compact_task::TaskType::VnodeWatermark, + Box::::default(), + ); compaction_selectors } @@ -766,8 +770,10 @@ impl HummockManager { &group_config, &mut stats, selector, - table_id_to_option.clone(), + &table_id_to_option, developer_config.clone(), + &version.latest_version().table_watermarks, + &version.latest_version().state_table_info, ) { let target_level_id = compact_task.input.target_level as u32; @@ -1768,6 +1774,8 @@ impl CompactionState { Some(compact_task::TaskType::Ttl) } else if guard.contains(&(group, compact_task::TaskType::Tombstone)) { Some(compact_task::TaskType::Tombstone) + } else if guard.contains(&(group, compact_task::TaskType::VnodeWatermark)) { + Some(compact_task::TaskType::VnodeWatermark) } else { None } diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index b1884edb417a1..ba9e43edf8267 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -305,6 +305,13 @@ impl HummockManager { compact_task::TaskType::SpaceReclaim, ) .await; + + // share the same trigger with SpaceReclaim + hummock_manager + .on_handle_trigger_multi_group( + compact_task::TaskType::VnodeWatermark, + ) + .await; } HummockTimerEvent::TtlCompactionTrigger => { diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 5ffbe47fa2d9c..891a1d8c6ab7e 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -14,25 +14,31 @@ #![cfg(any(test, feature = "test"))] +use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; use std::time::Duration; use itertools::Itertools; -use risingwave_common::catalog::TableId; +use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::util::epoch::test_epoch; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::key_with_epoch; -use risingwave_hummock_sdk::version::HummockVersion; +use risingwave_hummock_sdk::table_watermark::TableWatermarks; +use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo}; use risingwave_hummock_sdk::{ CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, }; use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType}; use risingwave_pb::hummock::compact_task::TaskStatus; +use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{CompactionConfig, KeyRange, SstableInfo}; use risingwave_pb::meta::add_worker_node_request::Property; use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; -use crate::hummock::compaction::selector::default_compaction_selector; +use crate::hummock::compaction::selector::{default_compaction_selector, LocalSelectorStatistic}; +use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionSelectorContext}; +use crate::hummock::level_handler::LevelHandler; +use crate::hummock::model::CompactionGroup; use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef}; use crate::manager::{ ClusterManager, ClusterManagerRef, FragmentManager, MetaSrvEnv, META_NODE_ID, @@ -402,3 +408,27 @@ pub async fn add_ssts( .unwrap(); test_tables } + +pub fn compaction_selector_context<'a>( + group: &'a CompactionGroup, + levels: &'a Levels, + member_table_ids: &'a BTreeSet, + level_handlers: &'a mut [LevelHandler], + selector_stats: &'a mut LocalSelectorStatistic, + table_id_to_options: &'a HashMap, + developer_config: Arc, + table_watermarks: &'a HashMap>, + state_table_info: &'a HummockVersionStateTableInfo, +) -> CompactionSelectorContext<'a> { + CompactionSelectorContext { + group, + levels, + member_table_ids, + level_handlers, + selector_stats, + table_id_to_options, + developer_config, + table_watermarks, + state_table_info, + } +} diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index a27a94bf319c4..4e3a63c87ec94 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -17,8 +17,11 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::sync::Arc; +use bytes::Bytes; use itertools::Itertools; +use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; +use risingwave_common::hash::VnodeBitmapExt; use risingwave_pb::hummock::group_delta::DeltaType; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::hummock_version_delta::GroupDeltas; @@ -35,8 +38,10 @@ use crate::change_log::TableChangeLog; use crate::compaction_group::StaticCompactionGroupId; use crate::key_range::KeyRangeCommon; use crate::prost_key_range::KeyRangeExt; -use crate::table_watermark::{TableWatermarks, VnodeWatermark}; -use crate::version::{HummockVersion, HummockVersionDelta}; +use crate::table_watermark::{ + ReadTableWatermark, TableWatermarks, VnodeWatermark, WatermarkDirection, +}; +use crate::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo}; use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId}; pub struct GroupDeltasSummary { @@ -248,56 +253,110 @@ impl HummockVersion { &self, existing_table_ids: &[u32], ) -> BTreeMap { - fn extract_single_table_watermark( - table_watermarks: &TableWatermarks, - safe_epoch: u64, - ) -> Option { - if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() - { - assert!( - *first_epoch >= safe_epoch, - "smallest epoch {} in table watermark should be at least safe epoch {}", - first_epoch, - safe_epoch - ); - if *first_epoch == safe_epoch { - Some(PbTableWatermarks { - epoch_watermarks: vec![PbEpochNewWatermarks { - watermarks: first_epoch_watermark - .iter() - .map(VnodeWatermark::to_protobuf) - .collect(), - epoch: *first_epoch, - }], - is_ascending: table_watermarks.direction.is_ascending(), - }) - } else { - None - } + safe_epoch_table_watermarks_impl( + &self.table_watermarks, + &self.state_table_info, + existing_table_ids, + ) + } +} + +pub fn safe_epoch_table_watermarks_impl( + table_watermarks: &HashMap>, + state_table_info: &HummockVersionStateTableInfo, + existing_table_ids: &[u32], +) -> BTreeMap { + fn extract_single_table_watermark( + table_watermarks: &TableWatermarks, + safe_epoch: u64, + ) -> Option { + if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() { + assert!( + *first_epoch >= safe_epoch, + "smallest epoch {} in table watermark should be at least safe epoch {}", + first_epoch, + safe_epoch + ); + if *first_epoch == safe_epoch { + Some(PbTableWatermarks { + epoch_watermarks: vec![PbEpochNewWatermarks { + watermarks: first_epoch_watermark + .iter() + .map(VnodeWatermark::to_protobuf) + .collect(), + epoch: *first_epoch, + }], + is_ascending: table_watermarks.direction.is_ascending(), + }) } else { None } + } else { + None } - self.table_watermarks - .iter() - .filter_map(|(table_id, table_watermarks)| { - let u32_table_id = table_id.table_id(); - if !existing_table_ids.contains(&u32_table_id) { - None - } else { - extract_single_table_watermark( - table_watermarks, - self.state_table_info - .info() - .get(table_id) - .expect("table should exist") - .safe_epoch, - ) - .map(|table_watermarks| (table_id.table_id, table_watermarks)) - } - }) - .collect() } + table_watermarks + .iter() + .filter_map(|(table_id, table_watermarks)| { + let u32_table_id = table_id.table_id(); + if !existing_table_ids.contains(&u32_table_id) { + None + } else { + extract_single_table_watermark( + table_watermarks, + state_table_info + .info() + .get(table_id) + .expect("table should exist") + .safe_epoch, + ) + .map(|table_watermarks| (table_id.table_id, table_watermarks)) + } + }) + .collect() +} + +pub fn safe_epoch_read_table_watermarks_impl( + safe_epoch_watermarks: &BTreeMap, +) -> BTreeMap { + safe_epoch_watermarks + .iter() + .map(|(table_id, watermarks)| { + assert_eq!(watermarks.epoch_watermarks.len(), 1); + let vnode_watermarks = &watermarks + .epoch_watermarks + .first() + .expect("should exist") + .watermarks; + let mut vnode_watermark_map = BTreeMap::new(); + for vnode_watermark in vnode_watermarks { + let watermark = Bytes::copy_from_slice(&vnode_watermark.watermark); + for vnode in + Bitmap::from(vnode_watermark.vnode_bitmap.as_ref().expect("should exist")) + .iter_vnodes() + { + assert!( + vnode_watermark_map + .insert(vnode, watermark.clone()) + .is_none(), + "duplicate table watermark on vnode {}", + vnode.to_index() + ); + } + } + ( + TableId::from(*table_id), + ReadTableWatermark { + direction: if watermarks.is_ascending { + WatermarkDirection::Ascending + } else { + WatermarkDirection::Descending + }, + vnode_watermarks: vnode_watermark_map, + }, + ) + }) + .collect() } impl HummockVersion { diff --git a/src/storage/src/hummock/iterator/skip_watermark.rs b/src/storage/src/hummock/iterator/skip_watermark.rs index e77c6b4d82a5c..d335844eb4001 100644 --- a/src/storage/src/hummock/iterator/skip_watermark.rs +++ b/src/storage/src/hummock/iterator/skip_watermark.rs @@ -16,9 +16,9 @@ use std::cmp::Ordering; use std::collections::{BTreeMap, VecDeque}; use bytes::Bytes; -use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; -use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common::hash::VirtualNode; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_read_table_watermarks_impl; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, TableStatsMap}; use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection}; @@ -187,44 +187,7 @@ impl SkipWatermarkState { pub fn from_safe_epoch_watermarks( safe_epoch_watermarks: &BTreeMap, ) -> Self { - let watermarks = safe_epoch_watermarks - .iter() - .map(|(table_id, watermarks)| { - assert_eq!(watermarks.epoch_watermarks.len(), 1); - let vnode_watermarks = &watermarks - .epoch_watermarks - .first() - .expect("should exist") - .watermarks; - let mut vnode_watermark_map = BTreeMap::new(); - for vnode_watermark in vnode_watermarks { - let watermark = Bytes::copy_from_slice(&vnode_watermark.watermark); - for vnode in - Bitmap::from(vnode_watermark.vnode_bitmap.as_ref().expect("should exist")) - .iter_vnodes() - { - assert!( - vnode_watermark_map - .insert(vnode, watermark.clone()) - .is_none(), - "duplicate table watermark on vnode {}", - vnode.to_index() - ); - } - } - ( - TableId::from(*table_id), - ReadTableWatermark { - direction: if watermarks.is_ascending { - WatermarkDirection::Ascending - } else { - WatermarkDirection::Descending - }, - vnode_watermarks: vnode_watermark_map, - }, - ) - }) - .collect(); + let watermarks = safe_epoch_read_table_watermarks_impl(safe_epoch_watermarks); Self::new(watermarks) } diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index ac9fb2f9dbf1a..9b57673e49c16 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -27,6 +27,7 @@ use rand::seq::{IteratorRandom, SliceRandom}; use rand::{thread_rng, Rng}; use risingwave_common::catalog::TableId; use risingwave_common::hash::WorkerSlotId; +use risingwave_hummock_sdk::{CompactionGroupId, HummockSstableId}; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::PbFragment; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; @@ -484,6 +485,50 @@ impl Cluster { .await??; Ok(()) } + + #[cfg_or_panic(madsim)] + pub async fn split_compaction_group( + &mut self, + compaction_group_id: CompactionGroupId, + table_id: HummockSstableId, + ) -> Result<()> { + self.ctl + .spawn(async move { + let mut command: Vec = vec![ + "hummock".into(), + "split-compaction-group".into(), + "--compaction-group-id".into(), + compaction_group_id.to_string(), + "--table-ids".into(), + table_id.to_string(), + ]; + start_ctl(command).await + }) + .await??; + Ok(()) + } + + #[cfg_or_panic(madsim)] + pub async fn trigger_manual_compaction( + &mut self, + compaction_group_id: CompactionGroupId, + level_id: u32, + ) -> Result<()> { + self.ctl + .spawn(async move { + let mut command: Vec = vec![ + "hummock".into(), + "trigger-manual-compaction".into(), + "--compaction-group-id".into(), + compaction_group_id.to_string(), + "--level".into(), + level_id.to_string(), + ]; + start_ctl(command).await + }) + .await??; + Ok(()) + } } #[cfg_attr(not(madsim), allow(dead_code))] diff --git a/src/tests/simulation/tests/integration_tests/compaction/mod.rs b/src/tests/simulation/tests/integration_tests/compaction/mod.rs new file mode 100644 index 0000000000000..0cd8ea767c3ed --- /dev/null +++ b/src/tests/simulation/tests/integration_tests/compaction/mod.rs @@ -0,0 +1,152 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![cfg(madsim)] + +use std::time::Duration; + +use risingwave_hummock_sdk::CompactionGroupId; +use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration, Session}; + +fn cluster_config(interval_sec: usize) -> Configuration { + use std::io::Write; + let config_path = { + let mut file = tempfile::NamedTempFile::new().expect("failed to create temp config file"); + file.write_all( + format!( + "\ +[meta] +max_heartbeat_interval_secs = 300 +periodic_space_reclaim_compaction_interval_sec = {interval_sec} + +[system] +barrier_interval_ms = 1000 +checkpoint_frequency = 1 + +[server] +telemetry_enabled = false +metrics_level = \"Disabled\" + " + ) + .as_bytes(), + ) + .expect("failed to write config file"); + file.into_temp_path() + }; + + Configuration { + config_path: ConfigPath::Temp(config_path.into()), + frontend_nodes: 1, + compute_nodes: 1, + meta_nodes: 1, + compactor_nodes: 4, + compute_node_cores: 2, + ..Default::default() + } +} + +#[tokio::test] +async fn test_vnode_watermark_reclaim() { + // The vnode watermark reclaim will be triggered, so the SST will be reclaimed. + let config = crate::compaction::cluster_config(10); + let mut cluster = Cluster::start(config).await.unwrap(); + // wait for the service to be ready + tokio::time::sleep(Duration::from_secs(15)).await; + let mut session = cluster.start_session(); + + let compaction_group_id = test_vnode_watermark_reclaim_impl(&mut cluster, &mut session).await; + + assert_compaction_group_sst_count(compaction_group_id, 6, 1, &mut session).await; + // Need wait longer for reclamation, due to the hard-coded STATE_CLEANING_PERIOD_EPOCH. + tokio::time::sleep(Duration::from_secs(500)).await; + assert_compaction_group_sst_count(compaction_group_id, 6, 0, &mut session).await; +} + +#[tokio::test] +async fn test_no_vnode_watermark_reclaim() { + // The vnode watermark reclaim won't be triggered, so the SST won't be reclaimed. + let config = crate::compaction::cluster_config(3600); + let mut cluster = Cluster::start(config).await.unwrap(); + // wait for the service to be ready + tokio::time::sleep(Duration::from_secs(15)).await; + let mut session = cluster.start_session(); + + let compaction_group_id = test_vnode_watermark_reclaim_impl(&mut cluster, &mut session).await; + + assert_compaction_group_sst_count(compaction_group_id, 6, 1, &mut session).await; + tokio::time::sleep(Duration::from_secs(500)).await; + assert_compaction_group_sst_count(compaction_group_id, 6, 1, &mut session).await; +} + +async fn assert_compaction_group_sst_count( + compaction_group_id: CompactionGroupId, + level_id: usize, + expected: usize, + session: &mut Session, +) { + let count = session + .run(format!("SELECT COUNT(*) FROM rw_hummock_sstables WHERE compaction_group_id={compaction_group_id} and level_id={level_id};")) + .await + .unwrap(); + assert_eq!(count.parse::().unwrap(), expected); +} + +async fn test_vnode_watermark_reclaim_impl( + cluster: &mut Cluster, + session: &mut Session, +) -> CompactionGroupId { + session + .run("CREATE TABLE t2 (ts timestamptz, v INT);") + .await + .unwrap(); + session + .run("CREATE MATERIALIZED VIEW mv2 AS SELECT * FROM t2 WHERE ts > now() - INTERVAL '10s';") + .await + .unwrap(); + + let table_id = session + .run("SELECT id FROM rw_internal_tables WHERE name LIKE '%dynamicfilterleft%';") + .await + .unwrap() + .parse::() + .unwrap(); + + // Move the table to a dedicated group to prevent its vnode watermark from being reclaimed during the compaction of other tables. + cluster.split_compaction_group(2, table_id).await.unwrap(); + tokio::time::sleep(Duration::from_secs(5)).await; + let compaction_group_id = session + .run("SELECT max(id) FROM rw_hummock_compaction_group_configs;") + .await + .unwrap() + .parse::() + .unwrap(); + assert!(compaction_group_id > 3); + + session + .run("INSERT INTO t2 VALUES (now(), 1);") + .await + .unwrap(); + session.run("FLUSH;").await.unwrap(); + assert_compaction_group_sst_count(compaction_group_id, 0, 1, session).await; + assert_compaction_group_sst_count(compaction_group_id, 6, 0, session).await; + // Compact data to L6 so that vnode watermark picker can take effects. + cluster + .trigger_manual_compaction(compaction_group_id, 0) + .await + .unwrap(); + tokio::time::sleep(Duration::from_secs(5)).await; + assert_compaction_group_sst_count(compaction_group_id, 0, 0, session).await; + assert_compaction_group_sst_count(compaction_group_id, 6, 1, session).await; + compaction_group_id +} diff --git a/src/tests/simulation/tests/integration_tests/main.rs b/src/tests/simulation/tests/integration_tests/main.rs index 475793a88b709..0ad0725dde405 100644 --- a/src/tests/simulation/tests/integration_tests/main.rs +++ b/src/tests/simulation/tests/integration_tests/main.rs @@ -29,4 +29,5 @@ mod sink; mod storage; mod throttle; +mod compaction; mod utils;