From 0f7d9280a134cd1c1932c8a51600077f03d66e1f Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 6 Nov 2024 16:54:41 +0800 Subject: [PATCH] feat(compaction): Limit the size of the new overlapping level --- src/meta/src/hummock/manager/commit_epoch.rs | 32 ++++- src/meta/src/hummock/manager/transaction.rs | 45 ++++++- .../hummock_test/src/hummock_storage_tests.rs | 126 ++++++++++++++++++ 3 files changed, 193 insertions(+), 10 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index c51c77a5d36a0..588aa7f6e33e7 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -112,7 +112,7 @@ impl HummockManager { let state_table_info = &version.latest_version().state_table_info; let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); let mut new_table_ids = HashMap::new(); - let mut new_compaction_groups = HashMap::new(); + let mut compaction_groups = HashMap::new(); let mut compaction_group_manager_txn = None; let mut compaction_group_config: Option> = None; @@ -143,7 +143,10 @@ impl HummockManager { ) }; let new_compaction_group_id = next_compaction_group_id(&self.env).await?; - new_compaction_groups.insert(new_compaction_group_id, compaction_group_config.clone()); + compaction_groups.insert( + new_compaction_group_id, + (true, compaction_group_config.clone()), + ); compaction_group_manager.insert( new_compaction_group_id, CompactionGroup { @@ -166,10 +169,33 @@ impl HummockManager { .await?; let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); + // fill compaction_groups + if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() { + for cg_id in &modified_compaction_groups { + if !compaction_groups.contains_key(cg_id) { + let compaction_group = compaction_group_manager + .get(cg_id) + .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id)) + .compaction_config(); + compaction_groups.insert(*cg_id, (false, compaction_group)); + } + } + } else { + let compaction_group_manager = self.compaction_group_manager.read().await; + for cg_id in &modified_compaction_groups { + if !compaction_groups.contains_key(cg_id) { + let compaction_group = compaction_group_manager + .try_get_compaction_group_config(*cg_id) + .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id)) + .compaction_config(); + compaction_groups.insert(*cg_id, (false, compaction_group)); + } + } + } let time_travel_delta = version.pre_commit_epoch( &tables_to_commit, - new_compaction_groups, + compaction_groups, commit_sstables, &new_table_ids, new_table_watermarks, diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 054ae657d594d..2c439f7000d26 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -111,7 +111,7 @@ impl<'a> HummockVersionTransaction<'a> { pub(super) fn pre_commit_epoch( &mut self, tables_to_commit: &HashMap, - new_compaction_groups: HashMap>, + compaction_groups: HashMap)>, commit_sstables: BTreeMap>, new_table_ids: &HashMap, new_table_watermarks: HashMap, @@ -121,18 +121,22 @@ impl<'a> HummockVersionTransaction<'a> { new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; - for (compaction_group_id, compaction_group_config) in new_compaction_groups { + for (compaction_group_id, (is_new, compaction_group_config)) in &compaction_groups { { + if !is_new { + continue; + } + let group_deltas = &mut new_version_delta .group_deltas - .entry(compaction_group_id) + .entry(*compaction_group_id) .or_default() .group_deltas; #[expect(deprecated)] group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { - group_config: Some((*compaction_group_config).clone()), - group_id: compaction_group_id, + group_config: Some((**compaction_group_config).clone()), + group_id: *compaction_group_id, parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId, new_sst_start_id: 0, // No need to set it when `NewCompactionGroup` @@ -145,14 +149,41 @@ impl<'a> HummockVersionTransaction<'a> { // Append SSTs to a new version. for (compaction_group_id, inserted_table_infos) in commit_sstables { + let mut accumulated_size = 0; + let mut ssts = vec![]; + let (_, config) = compaction_groups + .get(&compaction_group_id) + .unwrap_or_else(|| { + panic!( + "compaction group {:?} not found in compaction_groups", + compaction_group_id + ) + }); + let sub_level_size_limit = config.sub_level_max_compaction_bytes * 2; + let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) .or_default() .group_deltas; - let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos); - group_deltas.push(group_delta); + for sst in inserted_table_infos { + accumulated_size += sst.sst_size; + ssts.push(sst); + if accumulated_size > sub_level_size_limit { + let group_delta = GroupDelta::NewL0SubLevel(ssts); + group_deltas.push(group_delta); + + // reset the accumulated size and ssts + accumulated_size = 0; + ssts = vec![]; + } + } + + if accumulated_size != 0 { + let group_delta = GroupDelta::NewL0SubLevel(ssts); + group_deltas.push(group_delta); + } } // update state table info diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 24ba1ca1cf779..4d3cb7c5ce2b5 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2826,3 +2826,129 @@ async fn test_commit_multi_epoch() { assert_eq!(info.committed_epoch, epoch3); } } + +#[tokio::test] +async fn test_commit_with_large_size() { + let test_env = prepare_hummock_test_env().await; + let context_id = test_env.meta_client.context_id(); + let existing_table_id = TableId::new(1); + let initial_epoch = INVALID_EPOCH; + + let commit_epoch = + |epoch, ssts: Vec, new_table_fragment_infos, tables_to_commit: &[TableId]| { + let manager = &test_env.manager; + let tables_to_commit = tables_to_commit + .iter() + .map(|table_id| (*table_id, epoch)) + .collect(); + let sst_to_context = ssts.iter().map(|sst| (sst.object_id, context_id)).collect(); + + let sstables = ssts + .into_iter() + .map(|sst| LocalSstableInfo { + table_stats: sst + .table_ids + .iter() + .map(|&table_id| { + ( + table_id, + TableStats { + total_compressed_size: 10, + ..Default::default() + }, + ) + }) + .collect(), + sst_info: sst, + created_at: u64::MAX, + }) + .collect_vec(); + + async move { + manager + .commit_epoch(CommitEpochInfo { + new_table_watermarks: Default::default(), + sst_to_context, + sstables, + new_table_fragment_infos, + change_log_delta: Default::default(), + tables_to_commit, + }) + .await + .unwrap(); + } + }; + + let epoch1 = initial_epoch.next_epoch(); + let sst1_epoch1 = SstableInfo { + sst_id: 11, + object_id: 1, + table_ids: vec![existing_table_id.table_id], + file_size: 512 << 20, + sst_size: 512 << 20, + ..Default::default() + }; + + let sst1_epoch2 = SstableInfo { + sst_id: 12, + object_id: 2, + table_ids: vec![existing_table_id.table_id], + file_size: 512 << 20, + sst_size: 512 << 20, + ..Default::default() + }; + + let sst1_epoch3 = SstableInfo { + sst_id: 13, + object_id: 3, + table_ids: vec![existing_table_id.table_id], + file_size: 512 << 20, + sst_size: 512 << 20, + ..Default::default() + }; + + commit_epoch( + epoch1, + vec![ + sst1_epoch1.clone(), + sst1_epoch2.clone(), + sst1_epoch3.clone(), + ], + vec![NewTableFragmentInfo { + table_ids: HashSet::from_iter([existing_table_id]), + }], + &[existing_table_id], + ) + .await; + + let cg_id = + get_compaction_group_id_by_table_id(test_env.manager.clone(), existing_table_id.table_id()) + .await; + + let l0_sub_levels = test_env + .manager + .get_current_version() + .await + .levels + .get(&cg_id) + .unwrap() + .l0 + .clone(); + + assert_eq!(3, l0_sub_levels.sub_levels.len()); + assert_eq!(1, l0_sub_levels.sub_levels[0].table_infos.len()); + assert_eq!( + sst1_epoch1.object_id, + l0_sub_levels.sub_levels[0].table_infos[0].object_id + ); + assert_eq!(1, l0_sub_levels.sub_levels[1].table_infos.len()); + assert_eq!( + sst1_epoch2.object_id, + l0_sub_levels.sub_levels[1].table_infos[0].object_id + ); + assert_eq!(1, l0_sub_levels.sub_levels[2].table_infos.len()); + assert_eq!( + sst1_epoch3.object_id, + l0_sub_levels.sub_levels[2].table_infos[0].object_id + ); +}