diff --git a/proto/hummock.proto b/proto/hummock.proto index 89a0438fc43a7..149944831a4f9 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -69,25 +69,28 @@ message IntraLevelDelta { enum CompatibilityVersion { VERSION_UNSPECIFIED = 0; NO_TRIVIAL_SPLIT = 1; + NO_MEMBER_TABLE_IDS = 2; } message GroupConstruct { CompactionConfig group_config = 1; // If parent_group_id is not 0, it means parent_group_id splits into parent_group_id and this group, so this group is not empty initially. uint64 parent_group_id = 2; - repeated uint32 table_ids = 3; + repeated uint32 table_ids = 3 [deprecated = true]; uint64 group_id = 4; uint64 new_sst_start_id = 5; CompatibilityVersion version = 6; } message GroupMetaChange { - repeated uint32 table_ids_add = 1; - repeated uint32 table_ids_remove = 2; + option deprecated = true; + repeated uint32 table_ids_add = 1 [deprecated = true]; + repeated uint32 table_ids_remove = 2 [deprecated = true]; } message GroupTableChange { - repeated uint32 table_ids = 1; + option deprecated = true; + repeated uint32 table_ids = 1 [deprecated = true]; uint64 target_group_id = 2; uint64 origin_group_id = 3; uint64 new_sst_start_id = 4; @@ -102,7 +105,7 @@ message GroupDelta { GroupConstruct group_construct = 2; GroupDestroy group_destroy = 3; GroupMetaChange group_meta_change = 4; - GroupTableChange group_table_change = 5; + GroupTableChange group_table_change = 5 [deprecated = true]; } } @@ -152,11 +155,13 @@ message TableChangeLog { message StateTableInfo { uint64 committed_epoch = 1; uint64 safe_epoch = 2; + uint64 compaction_group_id = 3; } message StateTableInfoDelta { uint64 committed_epoch = 1; uint64 safe_epoch = 2; + uint64 compaction_group_id = 3; } message HummockVersion { @@ -165,7 +170,7 @@ message HummockVersion { OverlappingLevel l0 = 2; uint64 group_id = 3; uint64 parent_group_id = 4; - repeated uint32 member_table_ids = 5; + repeated uint32 member_table_ids = 5 [deprecated = true]; } uint64 id = 1; // Levels of each compaction group diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index 1d1c2fe4b99fb..bf4b608fe59ad 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -16,12 +16,12 @@ pub mod compaction_config; mod overlap_strategy; -use risingwave_common::catalog::TableOption; +use risingwave_common::catalog::{TableId, TableOption}; use risingwave_pb::hummock::compact_task::{self, TaskType}; mod picker; pub mod selector; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -92,6 +92,7 @@ impl CompactStatus { pub fn get_compact_task( &mut self, levels: &Levels, + member_table_ids: &BTreeSet, task_id: HummockCompactionTaskId, group: &CompactionGroup, stats: &mut LocalSelectorStatistic, @@ -106,6 +107,7 @@ impl CompactStatus { task_id, group, levels, + member_table_ids, &mut self.level_handlers, stats, table_id_to_options.clone(), @@ -121,6 +123,7 @@ impl CompactStatus { task_id, group, levels, + member_table_ids, &mut self.level_handlers, stats, table_id_to_options, diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index a2c3a7d52802e..bf05afc6c6e88 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -326,7 +326,6 @@ pub mod tests { generate_table(1, 1, 201, 210, 1), ], )], - member_table_ids: vec![1], ..Default::default() }; let mut local_stats = LocalPickerStatistic::default(); @@ -419,7 +418,6 @@ pub mod tests { total_file_size: 0, uncompressed_file_size: 0, }), - member_table_ids: vec![1], ..Default::default() }; push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(1, 1, 50, 140, 2)]); @@ -482,7 +480,6 @@ pub mod tests { total_file_size: 0, uncompressed_file_size: 0, }), - member_table_ids: vec![1], ..Default::default() }; push_tables_level0_nonoverlapping( @@ -586,7 +583,6 @@ pub mod tests { let levels = Levels { l0: Some(l0), levels: vec![generate_level(1, vec![generate_table(3, 1, 0, 100000, 1)])], - member_table_ids: vec![1], ..Default::default() }; let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; @@ -664,7 +660,6 @@ pub mod tests { let levels = Levels { l0: Some(l0), levels: vec![generate_level(1, vec![generate_table(3, 1, 0, 100000, 1)])], - member_table_ids: vec![1], ..Default::default() }; let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; @@ -731,7 +726,6 @@ pub mod tests { let levels = Levels { l0: Some(l0), levels: vec![generate_level(1, vec![generate_table(3, 1, 1, 100, 1)])], - member_table_ids: vec![1], ..Default::default() }; let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; diff --git a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs index 6b5dcae7d0c31..5cc65bd38a1c8 100644 --- a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs @@ -428,7 +428,6 @@ pub mod tests { total_file_size: 0, uncompressed_file_size: 0, }), - member_table_ids: vec![1], ..Default::default() }; push_tables_level0_nonoverlapping( @@ -473,7 +472,6 @@ pub mod tests { generate_table(1, 1, 100, 210, 2), generate_table(2, 1, 200, 250, 2), ])), - member_table_ids: vec![1], ..Default::default() }; let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; @@ -511,7 +509,6 @@ pub mod tests { let levels = Levels { l0: Some(l0), levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], - member_table_ids: vec![1], ..Default::default() }; let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; @@ -560,7 +557,6 @@ pub mod tests { let levels = Levels { l0: Some(l0), levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], - member_table_ids: vec![1], ..Default::default() }; let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; @@ -632,7 +628,6 @@ pub mod tests { let levels = Levels { l0: Some(l0), levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], - member_table_ids: vec![1], ..Default::default() }; let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)]; @@ -708,7 +703,6 @@ pub mod tests { let levels = Levels { l0: Some(l0), levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], - member_table_ids: vec![1], ..Default::default() }; levels_handler[1].add_pending_task(100, 1, levels.levels[0].get_table_infos()); @@ -729,7 +723,6 @@ pub mod tests { let mut levels = Levels { l0: Some(l0), levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], - member_table_ids: vec![1], ..Default::default() }; assert!(picker @@ -827,7 +820,6 @@ pub mod tests { let levels = Levels { l0: Some(l0), levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])], - member_table_ids: vec![1], ..Default::default() }; diff --git a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs index 82e58b87c5517..2d1b2c95b20aa 100644 --- a/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/manual_compaction_picker.rs @@ -326,7 +326,7 @@ impl CompactionPicker for ManualCompactionPicker { #[cfg(test)] pub mod tests { - use std::collections::HashMap; + use std::collections::{BTreeSet, HashMap}; use risingwave_pb::hummock::compact_task; pub use risingwave_pb::hummock::KeyRange; @@ -1198,6 +1198,7 @@ pub mod tests { 1, &group_config, &levels, + &BTreeSet::new(), &mut levels_handler, &mut local_stats, HashMap::default(), @@ -1235,6 +1236,7 @@ pub mod tests { 2, &group_config, &levels, + &BTreeSet::new(), &mut levels_handler, &mut local_stats, HashMap::default(), @@ -1308,6 +1310,7 @@ pub mod tests { 1, &group_config, &levels, + &BTreeSet::new(), &mut levels_handler, &mut local_stats, HashMap::default(), @@ -1347,6 +1350,7 @@ pub mod tests { 1, &group_config, &levels, + &BTreeSet::new(), &mut levels_handler, &mut local_stats, HashMap::default(), diff --git a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs index ab9f74c063a0b..b94f7587dd04e 100644 --- a/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/space_reclaim_compaction_picker.rs @@ -168,10 +168,11 @@ impl SpaceReclaimCompactionPicker { #[cfg(test)] mod test { - use std::collections::HashMap; + use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; use itertools::Itertools; + use risingwave_common::catalog::TableId; use risingwave_pb::hummock::compact_task; pub use risingwave_pb::hummock::{Level, LevelType}; @@ -234,11 +235,12 @@ mod test { } assert_eq!(levels.len(), 4); - let mut levels = Levels { + let levels = Levels { levels, l0: Some(l0), ..Default::default() }; + let mut member_table_ids = BTreeSet::new(); let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec(); let mut local_stats = LocalSelectorStatistic::default(); @@ -252,6 +254,7 @@ mod test { 1, &group_config, &levels, + &member_table_ids, &mut levels_handler, &mut local_stats, HashMap::default(), @@ -268,6 +271,7 @@ mod test { 1, &group_config, &levels, + &member_table_ids, &mut levels_handler, &mut local_stats, HashMap::default(), @@ -308,6 +312,7 @@ mod test { 1, &group_config, &levels, + &member_table_ids, &mut levels_handler, &mut local_stats, HashMap::default(), @@ -334,6 +339,7 @@ mod test { 1, &group_config, &levels, + &member_table_ids, &mut levels_handler, &mut local_stats, HashMap::default(), @@ -351,12 +357,17 @@ mod test { } } - levels.member_table_ids = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + member_table_ids = BTreeSet::from_iter( + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + .into_iter() + .map(TableId::new), + ); // pick space reclaim let task = selector.pick_compaction( 1, &group_config, &levels, + &member_table_ids, &mut levels_handler, &mut local_stats, HashMap::default(), @@ -372,13 +383,15 @@ mod test { } } - levels.member_table_ids = vec![2, 3, 4, 5, 6, 7, 8, 9]; + member_table_ids = + BTreeSet::from_iter([2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(TableId::new)); // pick space reclaim let task = selector .pick_compaction( 1, &group_config, &levels, + &member_table_ids, &mut levels_handler, &mut local_stats, HashMap::default(), @@ -407,7 +420,7 @@ mod test { // rebuild selector selector = SpaceReclaimCompactionSelector::default(); // cut range [3,4] [6] [8,9,10] - levels.member_table_ids = vec![0, 1, 2, 5, 7]; + member_table_ids = BTreeSet::from_iter([0, 1, 2, 5, 7].into_iter().map(TableId::new)); let expect_task_file_count = [2, 1, 4]; let expect_task_sst_id_range = [vec![3, 4], vec![6], vec![8, 9, 10, 11]]; for (index, x) in expect_task_file_count.iter().enumerate() { @@ -417,6 +430,7 @@ mod test { 1, &group_config, &levels, + &member_table_ids, &mut levels_handler, &mut local_stats, HashMap::default(), @@ -458,12 +472,13 @@ mod test { // rebuild selector selector = SpaceReclaimCompactionSelector::default(); // cut range [3,4] [6] [8,9,10] - levels.member_table_ids = vec![0, 1, 2, 5, 7]; + + member_table_ids = BTreeSet::from_iter([0, 1, 2, 5, 7].into_iter().map(TableId::new)); let expect_task_file_count = [2, 1, 5]; let expect_task_sst_id_range = [vec![3, 4], vec![6], vec![7, 8, 9, 10, 11]]; for (index, x) in expect_task_file_count.iter().enumerate() { if index == expect_task_file_count.len() - 1 { - levels.member_table_ids = vec![2, 5]; + member_table_ids = BTreeSet::from_iter([2, 5].into_iter().map(TableId::new)); } // // pick space reclaim @@ -472,6 +487,7 @@ mod test { 1, &group_config, &levels, + &member_table_ids, &mut levels_handler, &mut local_stats, HashMap::default(), diff --git a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs index ce86b523f6e86..ea25211b44749 100644 --- a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs @@ -291,7 +291,6 @@ pub mod tests { sub_levels: vec![l1, l2], }), levels: vec![], - member_table_ids: vec![1], ..Default::default() }; let config = Arc::new( diff --git a/src/meta/src/hummock/compaction/picker/tombstone_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/tombstone_reclaim_compaction_picker.rs index f07a8872c9b03..c7be0347fce61 100644 --- a/src/meta/src/hummock/compaction/picker/tombstone_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/tombstone_reclaim_compaction_picker.rs @@ -154,7 +154,6 @@ pub mod tests { ], ), ], - member_table_ids: vec![1], ..Default::default() }; let levels_handler = vec![ diff --git a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs index f690b0f80112c..bc1fc2ce304be 100644 --- a/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/ttl_reclaim_compaction_picker.rs @@ -201,6 +201,7 @@ impl TtlReclaimCompactionPicker { #[cfg(test)] mod test { + use std::collections::BTreeSet; use std::sync::Arc; use itertools::Itertools; @@ -377,6 +378,7 @@ mod test { 1, &group_config, &levels, + &BTreeSet::new(), &mut levels_handler, &mut local_stats, table_id_to_options, @@ -427,6 +429,7 @@ mod test { 1, &group_config, &levels, + &BTreeSet::new(), &mut levels_handler, &mut local_stats, table_id_to_options.clone(), @@ -460,6 +463,7 @@ mod test { 1, &group_config, &levels, + &BTreeSet::new(), &mut levels_handler, &mut local_stats, table_id_to_options.clone(), @@ -516,6 +520,7 @@ mod test { 1, &group_config, &levels, + &BTreeSet::new(), &mut levels_handler, &mut local_stats, table_id_to_options, @@ -557,6 +562,7 @@ mod test { 1, &group_config, &levels, + &BTreeSet::new(), &mut levels_handler, &mut local_stats, HashMap::default(), @@ -619,6 +625,7 @@ mod test { 1, &group_config, &levels, + &BTreeSet::new(), &mut levels_handler, &mut local_stats, table_id_to_options.clone(), @@ -711,6 +718,7 @@ mod test { 1, &group_config, &levels, + &BTreeSet::new(), &mut levels_handler, &mut local_stats, table_id_to_options.clone(), diff --git a/src/meta/src/hummock/compaction/selector/emergency_selector.rs b/src/meta/src/hummock/compaction/selector/emergency_selector.rs index c386aee5c8644..685ab1487d51c 100644 --- a/src/meta/src/hummock/compaction/selector/emergency_selector.rs +++ b/src/meta/src/hummock/compaction/selector/emergency_selector.rs @@ -37,6 +37,7 @@ impl CompactionSelector for EmergencySelector { task_id: HummockCompactionTaskId, group: &CompactionGroup, levels: &Levels, + _member_table_ids: &std::collections::BTreeSet, level_handlers: &mut [LevelHandler], selector_stats: &mut LocalSelectorStatistic, _table_id_to_options: HashMap, diff --git a/src/meta/src/hummock/compaction/selector/level_selector.rs b/src/meta/src/hummock/compaction/selector/level_selector.rs index a49f71877c6a3..38d1e35e22502 100644 --- a/src/meta/src/hummock/compaction/selector/level_selector.rs +++ b/src/meta/src/hummock/compaction/selector/level_selector.rs @@ -422,6 +422,7 @@ impl CompactionSelector for DynamicLevelSelector { task_id: HummockCompactionTaskId, compaction_group: &CompactionGroup, levels: &Levels, + _member_table_ids: &std::collections::BTreeSet, level_handlers: &mut [LevelHandler], selector_stats: &mut LocalSelectorStatistic, _table_id_to_options: HashMap, @@ -478,7 +479,7 @@ impl CompactionSelector for DynamicLevelSelector { #[cfg(test)] pub mod tests { - use std::collections::HashMap; + use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; use itertools::Itertools; @@ -600,7 +601,6 @@ pub mod tests { 3, 10, ))), - member_table_ids: vec![1], ..Default::default() }; @@ -612,6 +612,7 @@ pub mod tests { 1, &group_config, &levels, + &BTreeSet::new(), &mut levels_handlers, &mut local_stats, HashMap::default(), @@ -639,6 +640,7 @@ pub mod tests { 1, &group_config, &levels, + &BTreeSet::new(), &mut levels_handlers, &mut local_stats, HashMap::default(), @@ -658,6 +660,7 @@ pub mod tests { 2, &group_config, &levels, + &BTreeSet::new(), &mut levels_handlers, &mut local_stats, HashMap::default(), @@ -694,6 +697,7 @@ pub mod tests { 2, &group_config, &levels, + &BTreeSet::new(), &mut levels_handlers, &mut local_stats, HashMap::default(), diff --git a/src/meta/src/hummock/compaction/selector/manual_selector.rs b/src/meta/src/hummock/compaction/selector/manual_selector.rs index 427efadf3914d..62c94c8f888df 100644 --- a/src/meta/src/hummock/compaction/selector/manual_selector.rs +++ b/src/meta/src/hummock/compaction/selector/manual_selector.rs @@ -79,6 +79,7 @@ impl CompactionSelector for ManualCompactionSelector { task_id: HummockCompactionTaskId, group: &CompactionGroup, levels: &Levels, + _member_table_ids: &std::collections::BTreeSet, level_handlers: &mut [LevelHandler], _selector_stats: &mut LocalSelectorStatistic, _table_id_to_options: HashMap, diff --git a/src/meta/src/hummock/compaction/selector/mod.rs b/src/meta/src/hummock/compaction/selector/mod.rs index a342a661ecb7b..aca2457da62dc 100644 --- a/src/meta/src/hummock/compaction/selector/mod.rs +++ b/src/meta/src/hummock/compaction/selector/mod.rs @@ -24,13 +24,13 @@ mod space_reclaim_selector; mod tombstone_compaction_selector; mod ttl_selector; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; pub use emergency_selector::EmergencySelector; pub use level_selector::{DynamicLevelSelector, DynamicLevelSelectorCore}; pub use manual_selector::{ManualCompactionOption, ManualCompactionSelector}; -use risingwave_common::catalog::TableOption; +use risingwave_common::catalog::{TableId, TableOption}; use risingwave_hummock_sdk::HummockCompactionTaskId; use risingwave_pb::hummock::compact_task; use risingwave_pb::hummock::hummock_version::Levels; @@ -53,6 +53,7 @@ pub trait CompactionSelector: Sync + Send { task_id: HummockCompactionTaskId, group: &CompactionGroup, levels: &Levels, + member_table_ids: &BTreeSet, level_handlers: &mut [LevelHandler], selector_stats: &mut LocalSelectorStatistic, table_id_to_options: HashMap, diff --git a/src/meta/src/hummock/compaction/selector/space_reclaim_selector.rs b/src/meta/src/hummock/compaction/selector/space_reclaim_selector.rs index c48cb0fe605c5..b284a6a538b3e 100644 --- a/src/meta/src/hummock/compaction/selector/space_reclaim_selector.rs +++ b/src/meta/src/hummock/compaction/selector/space_reclaim_selector.rs @@ -17,10 +17,10 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; -use risingwave_common::catalog::TableOption; +use risingwave_common::catalog::{TableId, TableOption}; use risingwave_hummock_sdk::HummockCompactionTaskId; use risingwave_pb::hummock::compact_task; use risingwave_pb::hummock::hummock_version::Levels; @@ -44,6 +44,7 @@ impl CompactionSelector for SpaceReclaimCompactionSelector { task_id: HummockCompactionTaskId, group: &CompactionGroup, levels: &Levels, + member_table_ids: &BTreeSet, level_handlers: &mut [LevelHandler], _selector_stats: &mut LocalSelectorStatistic, _table_id_to_options: HashMap, @@ -53,7 +54,10 @@ impl CompactionSelector for SpaceReclaimCompactionSelector { DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config); let mut picker = SpaceReclaimCompactionPicker::new( group.compaction_config.max_space_reclaim_bytes, - levels.member_table_ids.iter().cloned().collect(), + member_table_ids + .iter() + .map(|table_id| table_id.table_id) + .collect(), ); let ctx = dynamic_level_core.calculate_level_base_size(levels); let state = self.state.entry(group.group_id).or_default(); diff --git a/src/meta/src/hummock/compaction/selector/tombstone_compaction_selector.rs b/src/meta/src/hummock/compaction/selector/tombstone_compaction_selector.rs index 5cbae609caa86..b05802513733d 100644 --- a/src/meta/src/hummock/compaction/selector/tombstone_compaction_selector.rs +++ b/src/meta/src/hummock/compaction/selector/tombstone_compaction_selector.rs @@ -42,6 +42,7 @@ impl CompactionSelector for TombstoneCompactionSelector { task_id: HummockCompactionTaskId, group: &CompactionGroup, levels: &Levels, + _member_table_ids: &std::collections::BTreeSet, level_handlers: &mut [LevelHandler], _selector_stats: &mut LocalSelectorStatistic, _table_id_to_options: HashMap, diff --git a/src/meta/src/hummock/compaction/selector/ttl_selector.rs b/src/meta/src/hummock/compaction/selector/ttl_selector.rs index ed099ede4158f..0e9497b06b17d 100644 --- a/src/meta/src/hummock/compaction/selector/ttl_selector.rs +++ b/src/meta/src/hummock/compaction/selector/ttl_selector.rs @@ -44,6 +44,7 @@ impl CompactionSelector for TtlCompactionSelector { task_id: HummockCompactionTaskId, group: &CompactionGroup, levels: &Levels, + _member_table_ids: &std::collections::BTreeSet, level_handlers: &mut [LevelHandler], _selector_stats: &mut LocalSelectorStatistic, table_id_to_options: HashMap, diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index d42c032d5a054..44117139dc192 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap}; use itertools::Itertools; use risingwave_common::catalog::TableId; @@ -27,9 +27,7 @@ use risingwave_hummock_sdk::{ use risingwave_pb::hummock::compact_task::{self}; use risingwave_pb::hummock::group_delta::DeltaType; use risingwave_pb::hummock::hummock_version_delta::ChangeLogDelta; -use risingwave_pb::hummock::{ - GroupDelta, GroupMetaChange, HummockSnapshot, IntraLevelDelta, StateTableInfoDelta, -}; +use risingwave_pb::hummock::{GroupDelta, HummockSnapshot, IntraLevelDelta, StateTableInfoDelta}; use crate::hummock::error::{Error, Result}; use crate::hummock::manager::transaction::{ @@ -140,81 +138,51 @@ impl HummockManager { let mut table_compaction_group_mapping = new_version_delta .latest_version() - .build_compaction_group_info(); + .state_table_info + .build_table_compaction_group_id(); let mut new_table_ids = None; // Add new table if let Some(new_fragment_table_info) = new_table_fragment_info { - let new_table_ids = new_table_ids.insert(HashSet::new()); + let new_table_ids = new_table_ids.insert(HashMap::new()); if !new_fragment_table_info.internal_table_ids.is_empty() { - if let Some(levels) = new_version_delta - .latest_version() - .levels - .get(&(StaticCompactionGroupId::StateDefault as u64)) - { - for table_id in &new_fragment_table_info.internal_table_ids { - if levels.member_table_ids.contains(&table_id.table_id) { - return Err(Error::CompactionGroup(format!( - "table {} already in group {}", - table_id, - StaticCompactionGroupId::StateDefault as u64 - ))); - } + for table_id in &new_fragment_table_info.internal_table_ids { + if let Some(info) = new_version_delta + .latest_version() + .state_table_info + .info() + .get(table_id) + { + return Err(Error::CompactionGroup(format!( + "table {} already exist {:?}", + table_id.table_id, info, + ))); } } - let group_deltas = &mut new_version_delta - .group_deltas - .entry(StaticCompactionGroupId::StateDefault as u64) - .or_default() - .group_deltas; - group_deltas.push(GroupDelta { - delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange { - table_ids_add: new_fragment_table_info - .internal_table_ids - .iter() - .map(|table_id| table_id.table_id) - .collect(), - ..Default::default() - })), - }); - for table_id in &new_fragment_table_info.internal_table_ids { table_compaction_group_mapping .insert(*table_id, StaticCompactionGroupId::StateDefault as u64); - new_table_ids.insert(*table_id); + new_table_ids.insert(*table_id, StaticCompactionGroupId::StateDefault as u64); } } if let Some(table_id) = new_fragment_table_info.mv_table_id { - if let Some(levels) = new_version_delta + if let Some(info) = new_version_delta .latest_version() - .levels - .get(&(StaticCompactionGroupId::MaterializedView as u64)) + .state_table_info + .info() + .get(&table_id) { - if levels.member_table_ids.contains(&table_id.table_id) { - return Err(Error::CompactionGroup(format!( - "table {} already in group {}", - table_id, - StaticCompactionGroupId::MaterializedView as u64 - ))); - } + return Err(Error::CompactionGroup(format!( + "table {} already exist {:?}", + table_id.table_id, info, + ))); } - let group_deltas = &mut new_version_delta - .group_deltas - .entry(StaticCompactionGroupId::MaterializedView as u64) - .or_default() - .group_deltas; - group_deltas.push(GroupDelta { - delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange { - table_ids_add: vec![table_id.table_id], - ..Default::default() - })), - }); let _ = table_compaction_group_mapping .insert(table_id, StaticCompactionGroupId::MaterializedView as u64); - new_table_ids.insert(table_id); + new_table_ids.insert(table_id, StaticCompactionGroupId::MaterializedView as u64); } } @@ -231,10 +199,12 @@ impl HummockManager { .levels .get(compaction_group_id) { - Some(compaction_group) => sst - .table_ids - .iter() - .all(|t| compaction_group.member_table_ids.contains(t)), + Some(_compaction_group) => sst.table_ids.iter().all(|t| { + table_compaction_group_mapping + .get(&TableId::new(*t)) + .map(|table_cg_id| table_cg_id == compaction_group_id) + .unwrap_or(false) + }), None => false, }; if !is_sst_belong_to_group_declared { @@ -339,23 +309,32 @@ impl HummockManager { // update state table info new_version_delta.with_latest_version(|version, delta| { if let Some(new_table_ids) = new_table_ids { - for table_id in new_table_ids { + for (table_id, cg_id) in new_table_ids { delta.state_table_info_delta.insert( table_id, StateTableInfoDelta { committed_epoch: epoch, safe_epoch: epoch, + compaction_group_id: cg_id, }, ); } } for (table_id, info) in version.state_table_info.info() { - delta.state_table_info_delta.insert( - *table_id, - StateTableInfoDelta { - committed_epoch: epoch, - safe_epoch: info.safe_epoch, - }, + assert!( + delta + .state_table_info_delta + .insert( + *table_id, + StateTableInfoDelta { + committed_epoch: epoch, + safe_epoch: info.safe_epoch, + compaction_group_id: info.compaction_group_id, + } + ) + .is_none(), + "newly added table exists previously: {:?}", + table_id ); } }); @@ -416,10 +395,17 @@ impl HummockManager { tracing::trace!("new committed epoch {}", epoch); let mut table_groups = HashMap::::default(); - for group in versioning.current_version.levels.values() { - for table_id in &group.member_table_ids { - table_groups.insert(*table_id, group.member_table_ids.len()); - } + for (table_id, info) in versioning.current_version.state_table_info.info() { + table_groups.insert( + table_id.table_id, + versioning + .current_version + .state_table_info + .compaction_group_member_tables() + .get(&info.compaction_group_id) + .expect("should exist") + .len(), + ); } drop(versioning_guard); // Don't trigger compactions if we enable deterministic compaction diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index 1969fc8ffc348..7138586f3ed20 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -206,6 +206,7 @@ impl<'a> HummockVersionTransaction<'a> { StateTableInfoDelta { committed_epoch: info.committed_epoch, safe_epoch: new_safe_epoch, + compaction_group_id: info.compaction_group_id, }, ); } @@ -732,11 +733,13 @@ impl HummockManager { || matches!(selector.task_type(), TaskType::Emergency); let mut stats = LocalSelectorStatistic::default(); - let member_table_ids = version + let member_table_ids: Vec<_> = version .latest_version() - .get_compaction_group_levels(compaction_group_id) - .member_table_ids - .clone(); + .state_table_info + .compaction_group_member_table_ids(compaction_group_id) + .iter() + .map(|table_id| table_id.table_id) + .collect(); let mut table_id_to_option: HashMap = HashMap::default(); @@ -750,6 +753,10 @@ impl HummockManager { version .latest_version() .get_compaction_group_levels(compaction_group_id), + version + .latest_version() + .state_table_info + .compaction_group_member_table_ids(compaction_group_id), task_id as HummockCompactionTaskId, &group_config, &mut stats, diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 3a3d596844e95..79596c8a3e774 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -19,8 +19,7 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ - get_compaction_group_ids, get_member_table_ids, try_get_compaction_group_id_by_table_id, - TableGroupInfo, + get_compaction_group_ids, get_member_table_ids, TableGroupInfo, }; use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId}; use risingwave_hummock_sdk::CompactionGroupId; @@ -33,7 +32,7 @@ use risingwave_pb::hummock::subscribe_compaction_event_request::ReportTask; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, GroupConstruct, - GroupDelta, GroupDestroy, GroupMetaChange, StateTableInfoDelta, + GroupDelta, GroupDestroy, StateTableInfoDelta, }; use tokio::sync::OnceCell; @@ -196,12 +195,14 @@ impl HummockManager { let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn(); for (table_id, _) in pairs { - if let Some(old_group) = - try_get_compaction_group_id_by_table_id(current_version, *table_id) + if let Some(info) = current_version + .state_table_info + .info() + .get(&TableId::new(*table_id)) { return Err(Error::CompactionGroup(format!( - "table {} already in group {}", - *table_id, old_group + "table {} already {:?}", + *table_id, info ))); } } @@ -256,18 +257,6 @@ impl HummockManager { }); } } - let group_deltas = &mut new_version_delta - .group_deltas - .entry(group_id) - .or_default() - .group_deltas; - - group_deltas.push(GroupDelta { - delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange { - table_ids_add: vec![*table_id], - ..Default::default() - })), - }); assert!(new_version_delta .state_table_info_delta .insert( @@ -275,6 +264,7 @@ impl HummockManager { StateTableInfoDelta { committed_epoch: epoch, safe_epoch: epoch, + compaction_group_id: *raw_group_id, } ) .is_none()); @@ -302,32 +292,24 @@ impl HummockManager { HashMap::new(); // Remove member tables for table_id in table_ids.iter().unique() { - let group_id = match try_get_compaction_group_id_by_table_id( - new_version_delta.latest_version(), - *table_id, - ) { - Some(group_id) => group_id, - None => continue, + let version = new_version_delta.latest_version(); + let Some(info) = version + .state_table_info + .info() + .get(&TableId::new(*table_id)) + else { + continue; }; - let group_deltas = &mut new_version_delta - .group_deltas - .entry(group_id) - .or_default() - .group_deltas; - group_deltas.push(GroupDelta { - delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange { - table_ids_remove: vec![*table_id], - ..Default::default() - })), - }); + modified_groups - .entry(group_id) + .entry(info.compaction_group_id) .and_modify(|count| *count -= 1) .or_insert( - new_version_delta - .latest_version() - .get_compaction_group_levels(group_id) - .member_table_ids + version + .state_table_info + .compaction_group_member_tables() + .get(&info.compaction_group_id) + .expect("should exist") .len() as u64 - 1, ); @@ -427,7 +409,12 @@ impl HummockManager { let group = CompactionGroupInfo { id: levels.group_id, parent_id: levels.parent_group_id, - member_table_ids: levels.member_table_ids.clone(), + member_table_ids: current_version + .state_table_info + .compaction_group_member_table_ids(levels.group_id) + .iter() + .map(|table_id| table_id.table_id) + .collect_vec(), compaction_config: Some(compaction_config), }; results.push(group); @@ -469,13 +456,24 @@ impl HummockManager { let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); // Validate parameters. - let parent_group = versioning + if !versioning .current_version .levels - .get(&parent_group_id) - .ok_or_else(|| Error::CompactionGroup(format!("invalid group {}", parent_group_id)))?; + .contains_key(&parent_group_id) + { + return Err(Error::CompactionGroup(format!( + "invalid group {}", + parent_group_id + ))); + } + for table_id in &table_ids { - if !parent_group.member_table_ids.contains(table_id) { + if !versioning + .current_version + .state_table_info + .compaction_group_member_table_ids(parent_group_id) + .contains(&TableId::new(*table_id)) + { return Err(Error::CompactionGroup(format!( "table {} doesn't in group {}", table_id, parent_group_id @@ -483,7 +481,13 @@ impl HummockManager { } } - if table_ids.len() == parent_group.member_table_ids.len() { + if table_ids.len() + == versioning + .current_version + .state_table_info + .compaction_group_member_table_ids(parent_group_id) + .len() + { return Err(Error::CompactionGroup(format!( "invalid split attempt for group {}: all member tables are moved", parent_group_id @@ -521,6 +525,8 @@ impl HummockManager { .clone(); config.split_weight_by_vnode = partition_vnode_count; + #[expect(deprecated)] + // fill the deprecated field with default value new_version_delta.group_deltas.insert( new_compaction_group_id, GroupDeltas { @@ -530,20 +536,8 @@ impl HummockManager { group_id: new_compaction_group_id, parent_group_id, new_sst_start_id, - table_ids: table_ids.to_vec(), - version: CompatibilityVersion::NoTrivialSplit as i32, - })), - }], - }, - ); - - new_version_delta.group_deltas.insert( - parent_group_id, - GroupDeltas { - group_deltas: vec![GroupDelta { - delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange { - table_ids_remove: table_ids.to_vec(), - ..Default::default() + table_ids: vec![], + version: CompatibilityVersion::NoMemberTableIds as i32, })), }], }, @@ -553,6 +547,27 @@ impl HummockManager { }; let (new_compaction_group_id, config) = new_group; + new_version_delta.with_latest_version(|version, new_version_delta| { + for table_id in &table_ids { + let table_id = TableId::new(*table_id); + let info = version + .state_table_info + .info() + .get(&table_id) + .expect("have check exist previously"); + assert!(new_version_delta + .state_table_info_delta + .insert( + table_id, + StateTableInfoDelta { + committed_epoch: info.committed_epoch, + safe_epoch: info.safe_epoch, + compaction_group_id: new_compaction_group_id, + } + ) + .is_none()); + } + }); { let mut compaction_group_manager = self.compaction_group_manager.write().await; let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn(); @@ -606,21 +621,26 @@ impl HummockManager { { let versioning_guard = self.versioning.read().await; let version = &versioning_guard.current_version; - for (group_id, group) in &version.levels { + for group_id in version.levels.keys() { let mut group_info = TableGroupInfo { group_id: *group_id, ..Default::default() }; - for table_id in &group.member_table_ids { + for table_id in version + .state_table_info + .compaction_group_member_table_ids(*group_id) + { let stats_size = versioning_guard .version_stats .table_stats - .get(table_id) + .get(&table_id.table_id) .map(|stats| stats.total_key_size + stats.total_value_size) .unwrap_or(0); let table_size = std::cmp::max(stats_size, 0) as u64; group_info.group_size += table_size; - group_info.table_statistic.insert(*table_id, table_size); + group_info + .table_statistic + .insert(table_id.table_id, table_size); } infos.push(group_info); } diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 5b62e5f0694b7..8d4fcf89b33be 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -1378,14 +1378,20 @@ async fn test_split_compaction_group_on_commit() { ); assert_eq!( current_version - .get_compaction_group_levels(2) - .member_table_ids, + .state_table_info + .compaction_group_member_table_ids(2) + .iter() + .map(|table_id| table_id.table_id) + .collect_vec(), vec![100] ); assert_eq!( current_version - .get_compaction_group_levels(3) - .member_table_ids, + .state_table_info + .compaction_group_member_table_ids(3) + .iter() + .map(|table_id| table_id.table_id) + .collect_vec(), vec![101] ); } @@ -1511,14 +1517,21 @@ async fn test_split_compaction_group_on_demand_basic() { ); assert_eq!( current_version - .get_compaction_group_levels(2) - .member_table_ids, + .state_table_info + .compaction_group_member_table_ids(2) + .iter() + .map(|table_id| table_id.table_id) + .collect_vec(), vec![102] ); assert_eq!( current_version - .get_compaction_group_levels(new_group_id) - .member_table_ids, + .state_table_info + .compaction_group_member_table_ids(new_group_id) + .iter() + .map(|table_id| table_id.table_id) + .sorted() + .collect_vec(), vec![100, 101] ); } @@ -1575,14 +1588,20 @@ async fn test_split_compaction_group_on_demand_non_trivial() { ); assert_eq!( current_version - .get_compaction_group_levels(2) - .member_table_ids, + .state_table_info + .compaction_group_member_table_ids(2) + .iter() + .map(|table_id| table_id.table_id) + .collect_vec(), vec![101] ); assert_eq!( current_version - .get_compaction_group_levels(new_group_id) - .member_table_ids, + .state_table_info + .compaction_group_member_table_ids(new_group_id) + .iter() + .map(|table_id| table_id.table_id) + .collect_vec(), vec![100] ); } @@ -1698,14 +1717,21 @@ async fn test_split_compaction_group_trivial_expired() { assert!(new_group_id > StaticCompactionGroupId::End as u64); assert_eq!( current_version - .get_compaction_group_levels(2) - .member_table_ids, + .state_table_info + .compaction_group_member_table_ids(2) + .iter() + .map(|table_id| table_id.table_id) + .sorted() + .collect_vec(), vec![101, 102] ); assert_eq!( current_version - .get_compaction_group_levels(new_group_id) - .member_table_ids, + .state_table_info + .compaction_group_member_table_ids(new_group_id) + .iter() + .map(|table_id| table_id.table_id) + .collect_vec(), vec![100] ); @@ -2370,14 +2396,20 @@ async fn test_unregister_moved_table() { ); assert_eq!( current_version - .get_compaction_group_levels(2) - .member_table_ids, + .state_table_info + .compaction_group_member_table_ids(2) + .iter() + .map(|table_id| table_id.table_id) + .collect_vec(), vec![101] ); assert_eq!( current_version - .get_compaction_group_levels(new_group_id) - .member_table_ids, + .state_table_info + .compaction_group_member_table_ids(new_group_id) + .iter() + .map(|table_id| table_id.table_id) + .collect_vec(), vec![100] ); @@ -2391,8 +2423,11 @@ async fn test_unregister_moved_table() { ); assert_eq!( current_version - .get_compaction_group_levels(2) - .member_table_ids, + .state_table_info + .compaction_group_member_table_ids(2) + .iter() + .map(|table_id| table_id.table_id) + .collect_vec(), vec![101] ); } diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 790ac6b54fef1..2e6b2512a8be0 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -332,7 +332,12 @@ pub(super) fn calc_new_write_limits( new_write_limits.insert( *id, WriteLimit { - table_ids: levels.member_table_ids.clone(), + table_ids: version + .state_table_info + .compaction_group_member_table_ids(*id) + .iter() + .map(|table_id| table_id.table_id) + .collect(), reason: write_limit_type.as_str(), }, ); diff --git a/src/meta/src/hummock/metrics_utils.rs b/src/meta/src/hummock/metrics_utils.rs index 19eeb24f67aa9..3779ff5b2be97 100644 --- a/src/meta/src/hummock/metrics_utils.rs +++ b/src/meta/src/hummock/metrics_utils.rs @@ -529,12 +529,17 @@ pub fn trigger_write_stop_stats( pub fn trigger_split_stat(metrics: &MetaMetrics, version: &HummockVersion) { let branched_ssts = version.build_branched_sst_info(); - for (compaction_group_id, group) in &version.levels { + for compaction_group_id in version.levels.keys() { let group_label = compaction_group_id.to_string(); metrics .state_table_count .with_label_values(&[&group_label]) - .set(group.member_table_ids.len() as _); + .set( + version + .state_table_info + .compaction_group_member_table_ids(*compaction_group_id) + .len() as _, + ); let branched_sst_count: usize = branched_ssts .values() diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 3227c4ad3d34e..90f70a4f74f3a 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -172,9 +172,10 @@ impl HummockMetaClient for MockHummockMetaClient { sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info), &vec![epoch], version - .levels - .values() - .flat_map(|group| group.member_table_ids.iter().map(|table_id| (*table_id, 0))), + .state_table_info + .info() + .keys() + .map(|table_id| (table_id.table_id, 0)), ); self.hummock_manager .commit_epoch( diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index ce8980598f772..f1fdf66c2f895 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -25,6 +25,7 @@ use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::test_epoch; +use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::table_watermark::{ TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, }; @@ -132,6 +133,7 @@ fn gen_version( StateTableInfoDelta { committed_epoch, safe_epoch: test_epoch(old_epoch_idx as _), + compaction_group_id: StaticCompactionGroupId::StateDefault as _, }, ) }) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 2231878dc9ef6..102e364b8d00b 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -496,18 +496,40 @@ impl HummockVersion { ); let parent_group_id = group_construct.parent_group_id; new_levels.parent_group_id = parent_group_id; + #[expect(deprecated)] + // for backward-compatibility of previous hummock version delta new_levels .member_table_ids .clone_from(&group_construct.table_ids); self.levels.insert(*compaction_group_id, new_levels); + let member_table_ids = + if group_construct.version >= CompatibilityVersion::NoMemberTableIds as _ { + self.state_table_info + .compaction_group_member_table_ids(*compaction_group_id) + .iter() + .map(|table_id| table_id.table_id) + .collect() + } else { + #[expect(deprecated)] + // for backward-compatibility of previous hummock version delta + HashSet::from_iter(group_construct.table_ids.clone()) + }; + self.init_with_parent_group( parent_group_id, *compaction_group_id, - HashSet::from_iter(group_construct.table_ids.clone()), + member_table_ids, group_construct.get_new_sst_start_id(), - group_construct.version() == CompatibilityVersion::VersionUnspecified, + group_construct.version < CompatibilityVersion::NoTrivialSplit as _, ); } else if let Some(group_change) = &summary.group_table_change { + // TODO: may deprecate this branch? This enum variant is not created anywhere + assert!( + group_change.version <= CompatibilityVersion::NoTrivialSplit as _, + "DeltaType::GroupTableChange is not used anymore after CompatibilityVersion::NoMemberTableIds is added" + ); + #[expect(deprecated)] + // for backward-compatibility of previous hummock version delta self.init_with_parent_group( group_change.origin_group_id, group_change.target_group_id, @@ -520,10 +542,14 @@ impl HummockVersion { .levels .get_mut(&group_change.origin_group_id) .expect("compaction group should exist"); + #[expect(deprecated)] + // for backward-compatibility of previous hummock version delta let mut moving_tables = levels .member_table_ids .extract_if(|t| group_change.table_ids.contains(t)) .collect_vec(); + #[expect(deprecated)] + // for backward-compatibility of previous hummock version delta self.levels .get_mut(compaction_group_id) .expect("compaction group should exist") @@ -536,6 +562,7 @@ impl HummockVersion { .get_mut(compaction_group_id) .expect("compaction group should exist"); + #[expect(deprecated)] // for backward-compatibility of previous hummock version delta for group_meta_delta in &summary.group_meta_changes { levels .member_table_ids @@ -583,7 +610,11 @@ impl HummockVersion { } } else { // `max_committed_epoch` is not changed. The delta is caused by compaction. - levels.apply_compact_ssts(summary); + levels.apply_compact_ssts( + summary, + self.state_table_info + .compaction_group_member_table_ids(*compaction_group_id), + ); } if has_destroy { self.levels.remove(compaction_group_id); @@ -693,16 +724,6 @@ impl HummockVersion { } } - pub fn build_compaction_group_info(&self) -> HashMap { - let mut ret = HashMap::new(); - for (compaction_group_id, levels) in &self.levels { - for table_id in &levels.member_table_ids { - ret.insert(TableId::new(*table_id), *compaction_group_id); - } - } - ret - } - pub fn build_branched_sst_info(&self) -> BTreeMap { let mut ret: BTreeMap<_, _> = BTreeMap::new(); for (compaction_group_id, group) in &self.levels { @@ -757,7 +778,11 @@ impl Levels { .sum() } - pub fn apply_compact_ssts(&mut self, summary: GroupDeltasSummary) { + pub fn apply_compact_ssts( + &mut self, + summary: GroupDeltasSummary, + member_table_ids: &BTreeSet, + ) { let GroupDeltasSummary { delete_sst_levels, delete_sst_ids_set, @@ -810,9 +835,11 @@ impl Levels { insert_sub_level_id, delete_sst_ids_set, l0.sub_levels.iter().map(|level| level.sub_level_id).collect_vec() ); if l0.sub_levels[index].table_infos.is_empty() - && self.member_table_ids.len() == 1 + && member_table_ids.len() == 1 && insert_table_infos.iter().all(|sst| { - sst.table_ids.len() == 1 && sst.table_ids[0] == self.member_table_ids[0] + sst.table_ids.len() == 1 + && sst.table_ids[0] + == member_table_ids.iter().next().expect("non-empty").table_id }) { // Only change vnode_partition_count for group which has only one state-table. @@ -830,7 +857,7 @@ impl Levels { self.levels[idx].vnode_partition_count = new_vnode_partition_count; } else if self.levels[idx].vnode_partition_count != 0 && new_vnode_partition_count == 0 - && self.member_table_ids.len() > 1 + && member_table_ids.len() > 1 { self.levels[idx].vnode_partition_count = 0; } @@ -901,6 +928,7 @@ pub fn build_initial_compaction_group_levels( vnode_partition_count: 0, }); } + #[expect(deprecated)] // for backward-compatibility of previous hummock version delta Levels { levels, l0: Some(OverlappingLevel { @@ -952,18 +980,6 @@ fn split_sst_info_for_level( insert_table_infos } -pub fn try_get_compaction_group_id_by_table_id( - version: &HummockVersion, - table_id: StateTableId, -) -> Option { - for (group_id, levels) in &version.levels { - if levels.member_table_ids.contains(&table_id) { - return Some(*group_id); - } - } - None -} - /// Gets all compaction group ids. pub fn get_compaction_group_ids( version: &HummockVersion, @@ -974,9 +990,10 @@ pub fn get_compaction_group_ids( /// Gets all member table ids. pub fn get_member_table_ids(version: &HummockVersion) -> HashSet { version - .levels - .iter() - .flat_map(|(_, levels)| levels.member_table_ids.clone()) + .state_table_info + .info() + .keys() + .map(|table_id| table_id.table_id) .collect() } @@ -984,14 +1001,10 @@ pub fn get_table_compaction_group_id_mapping( version: &HummockVersion, ) -> HashMap { version - .levels + .state_table_info + .info() .iter() - .flat_map(|(group_id, levels)| { - levels - .member_table_ids - .iter() - .map(|table_id| (*table_id, *group_id)) - }) + .map(|(table_id, info)| (table_id.table_id, info.compaction_group_id)) .collect() } @@ -1209,7 +1222,6 @@ pub fn validate_version(version: &HummockVersion) -> Vec { )); } - let mut table_to_group = HashMap::new(); // Ensure each table maps to only one compaction group for (group_id, levels) in &version.levels { // Ensure compaction group id matches @@ -1220,31 +1232,6 @@ pub fn validate_version(version: &HummockVersion) -> Vec { )); } - // Ensure table id is sorted - if !levels.member_table_ids.is_sorted() { - res.push(format!( - "GROUP {}: memtable_table_ids is not sorted: {:?}", - group_id, levels.member_table_ids - )); - } - - // Ensure table id is unique - for table_id in &levels.member_table_ids { - match table_to_group.entry(table_id) { - Entry::Occupied(e) => { - res.push(format!( - "GROUP {}: Duplicated table_id {}. First found in group {}", - group_id, - table_id, - e.get() - )); - } - Entry::Vacant(e) => { - e.insert(group_id); - } - } - } - let validate_level = |group: CompactionGroupId, expected_level_idx: u32, level: &Level, diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 06c9a358883f4..97e1a334dcf98 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -22,6 +22,7 @@ #![feature(is_sorted)] #![feature(let_chains)] #![feature(btree_cursors)] +#![feature(lazy_cell)] mod key_cmp; use std::cmp::Ordering; diff --git a/src/storage/hummock_sdk/src/table_stats.rs b/src/storage/hummock_sdk/src/table_stats.rs index 6f6bb9b9e3d40..4fce0e0f048ae 100644 --- a/src/storage/hummock_sdk/src/table_stats.rs +++ b/src/storage/hummock_sdk/src/table_stats.rs @@ -13,8 +13,9 @@ // limitations under the License. use std::borrow::Borrow; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; +use risingwave_common::catalog::TableId; use risingwave_pb::hummock::PbTableStats; use crate::version::HummockVersion; @@ -106,11 +107,12 @@ pub fn purge_prost_table_stats( table_stats: &mut PbTableStatsMap, hummock_version: &HummockVersion, ) -> bool { - let mut all_tables_in_version: HashSet = HashSet::default(); let prev_count = table_stats.len(); - for group in hummock_version.levels.values() { - all_tables_in_version.extend(group.member_table_ids.clone()); - } - table_stats.retain(|k, _| all_tables_in_version.contains(k)); + table_stats.retain(|table_id, _| { + hummock_version + .state_table_info + .info() + .contains_key(&TableId::new(*table_id)) + }); prev_count != table_stats.len() } diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index b30c1ba0e5191..0315ee9b8f298 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -13,9 +13,10 @@ // limitations under the License. use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::mem::{replace, size_of}; -use std::sync::Arc; +use std::ops::Deref; +use std::sync::{Arc, LazyLock}; use prost::Message; use risingwave_common::catalog::TableId; @@ -39,13 +40,37 @@ use crate::{CompactionGroupId, HummockSstableObjectId, HummockVersionId, FIRST_V #[derive(Debug, Clone, PartialEq)] pub struct HummockVersionStateTableInfo { state_table_info: HashMap, + + // in memory index + compaction_group_member_tables: HashMap>, } impl HummockVersionStateTableInfo { pub fn empty() -> Self { Self { state_table_info: HashMap::new(), + compaction_group_member_tables: HashMap::new(), + } + } + + fn build_compaction_group_member_tables( + state_table_info: &HashMap, + ) -> HashMap> { + let mut ret: HashMap<_, BTreeSet<_>> = HashMap::new(); + for (table_id, info) in state_table_info { + assert!(ret + .entry(info.compaction_group_id) + .or_default() + .insert(*table_id)); } + ret + } + + pub fn build_table_compaction_group_id(&self) -> HashMap { + self.state_table_info + .iter() + .map(|(table_id, info)| (*table_id, info.compaction_group_id)) + .collect() } pub fn from_protobuf(state_table_info: &HashMap) -> Self { @@ -53,7 +78,12 @@ impl HummockVersionStateTableInfo { .iter() .map(|(table_id, info)| (TableId::new(*table_id), info.clone())) .collect(); - Self { state_table_info } + let compaction_group_member_tables = + Self::build_compaction_group_member_tables(&state_table_info); + Self { + state_table_info, + compaction_group_member_tables, + } } pub fn to_protobuf(&self) -> HashMap { @@ -69,8 +99,28 @@ impl HummockVersionStateTableInfo { removed_table_id: &HashSet, ) -> HashMap> { let mut changed_table = HashMap::new(); + fn remove_table_from_compaction_group( + compaction_group_member_tables: &mut HashMap>, + compaction_group_id: CompactionGroupId, + table_id: TableId, + ) { + let member_tables = compaction_group_member_tables + .get_mut(&compaction_group_id) + .expect("should exist"); + assert!(member_tables.remove(&table_id)); + if member_tables.is_empty() { + assert!(compaction_group_member_tables + .remove(&compaction_group_id) + .is_some()); + } + } for table_id in removed_table_id { if let Some(prev_info) = self.state_table_info.remove(table_id) { + remove_table_from_compaction_group( + &mut self.compaction_group_member_tables, + prev_info.compaction_group_id, + *table_id, + ); assert!(changed_table.insert(*table_id, Some(prev_info)).is_none()); } else { warn!( @@ -86,6 +136,7 @@ impl HummockVersionStateTableInfo { let new_info = StateTableInfo { committed_epoch: delta.committed_epoch, safe_epoch: delta.safe_epoch, + compaction_group_id: delta.compaction_group_id, }; match self.state_table_info.entry(*table_id) { Entry::Occupied(mut entry) => { @@ -98,21 +149,57 @@ impl HummockVersionStateTableInfo { prev_info, new_info ); + if prev_info.compaction_group_id != new_info.compaction_group_id { + // table moved to another compaction group + remove_table_from_compaction_group( + &mut self.compaction_group_member_tables, + prev_info.compaction_group_id, + *table_id, + ); + assert!(self + .compaction_group_member_tables + .entry(new_info.compaction_group_id) + .or_default() + .insert(*table_id)); + } let prev_info = replace(prev_info, new_info); changed_table.insert(*table_id, Some(prev_info)); } Entry::Vacant(entry) => { + assert!(self + .compaction_group_member_tables + .entry(new_info.compaction_group_id) + .or_default() + .insert(*table_id)); entry.insert(new_info); changed_table.insert(*table_id, None); } } } + debug_assert_eq!( + self.compaction_group_member_tables, + Self::build_compaction_group_member_tables(&self.state_table_info) + ); changed_table } pub fn info(&self) -> &HashMap { &self.state_table_info } + + pub fn compaction_group_member_table_ids( + &self, + compaction_group_id: CompactionGroupId, + ) -> &BTreeSet { + static EMPTY_SET: LazyLock> = LazyLock::new(BTreeSet::new); + self.compaction_group_member_tables + .get(&compaction_group_id) + .unwrap_or_else(|| EMPTY_SET.deref()) + } + + pub fn compaction_group_member_tables(&self) -> &HashMap> { + &self.compaction_group_member_tables + } } #[derive(Debug, Clone, PartialEq)] @@ -225,18 +312,21 @@ impl HummockVersion { } pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool { - // state_table_info is not previously filled, but there previously exists some tables + // for backward-compatibility of previous hummock version delta self.state_table_info.state_table_info.is_empty() - && self - .levels - .values() - .any(|group| !group.member_table_ids.is_empty()) + && self.levels.values().any(|group| { + // state_table_info is not previously filled, but there previously exists some tables + #[expect(deprecated)] + !group.member_table_ids.is_empty() + }) } pub fn may_fill_backward_compatible_state_table_info_delta( &self, delta: &mut HummockVersionDelta, ) { + #[expect(deprecated)] + // for backward-compatibility of previous hummock version delta for (cg_id, group) in &self.levels { for table_id in &group.member_table_ids { assert!( @@ -247,6 +337,7 @@ impl HummockVersion { StateTableInfoDelta { committed_epoch: self.max_committed_epoch, safe_epoch: self.safe_epoch, + compaction_group_id: *cg_id, } ) .is_none(), diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index 6302f91739c20..5e89a135d825c 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -85,7 +85,7 @@ impl PinnedVersion { pinned_version_manager_tx: UnboundedSender, ) -> Self { let version_id = version.id; - let compaction_group_index = version.build_compaction_group_info(); + let compaction_group_index = version.state_table_info.build_table_compaction_group_id(); PinnedVersion { version: Arc::new(version), @@ -109,7 +109,7 @@ impl PinnedVersion { self.version.id ); let version_id = version.id; - let compaction_group_index = version.build_compaction_group_info(); + let compaction_group_index = version.state_table_info.build_table_compaction_group_id(); PinnedVersion { version: Arc::new(version), @@ -135,9 +135,10 @@ impl PinnedVersion { pub fn levels(&self, table_id: TableId) -> impl Iterator { #[auto_enum(Iterator)] - match self.compaction_group_index.get(&table_id) { - Some(compaction_group_id) => { - let levels = self.levels_by_compaction_groups_id(*compaction_group_id); + match self.version.state_table_info.info().get(&table_id) { + Some(info) => { + let compaction_group_id = info.compaction_group_id; + let levels = self.levels_by_compaction_groups_id(compaction_group_id); levels .l0 .as_ref()