From 0f7d9280a134cd1c1932c8a51600077f03d66e1f Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 6 Nov 2024 16:54:41 +0800 Subject: [PATCH 1/9] feat(compaction): Limit the size of the new overlapping level --- src/meta/src/hummock/manager/commit_epoch.rs | 32 ++++- src/meta/src/hummock/manager/transaction.rs | 45 ++++++- .../hummock_test/src/hummock_storage_tests.rs | 126 ++++++++++++++++++ 3 files changed, 193 insertions(+), 10 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index c51c77a5d36a0..588aa7f6e33e7 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -112,7 +112,7 @@ impl HummockManager { let state_table_info = &version.latest_version().state_table_info; let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); let mut new_table_ids = HashMap::new(); - let mut new_compaction_groups = HashMap::new(); + let mut compaction_groups = HashMap::new(); let mut compaction_group_manager_txn = None; let mut compaction_group_config: Option> = None; @@ -143,7 +143,10 @@ impl HummockManager { ) }; let new_compaction_group_id = next_compaction_group_id(&self.env).await?; - new_compaction_groups.insert(new_compaction_group_id, compaction_group_config.clone()); + compaction_groups.insert( + new_compaction_group_id, + (true, compaction_group_config.clone()), + ); compaction_group_manager.insert( new_compaction_group_id, CompactionGroup { @@ -166,10 +169,33 @@ impl HummockManager { .await?; let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); + // fill compaction_groups + if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() { + for cg_id in &modified_compaction_groups { + if !compaction_groups.contains_key(cg_id) { + let compaction_group = compaction_group_manager + .get(cg_id) + .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id)) + .compaction_config(); + compaction_groups.insert(*cg_id, (false, compaction_group)); + } + } + } else { + let compaction_group_manager = self.compaction_group_manager.read().await; + for cg_id in &modified_compaction_groups { + if !compaction_groups.contains_key(cg_id) { + let compaction_group = compaction_group_manager + .try_get_compaction_group_config(*cg_id) + .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id)) + .compaction_config(); + compaction_groups.insert(*cg_id, (false, compaction_group)); + } + } + } let time_travel_delta = version.pre_commit_epoch( &tables_to_commit, - new_compaction_groups, + compaction_groups, commit_sstables, &new_table_ids, new_table_watermarks, diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 054ae657d594d..2c439f7000d26 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -111,7 +111,7 @@ impl<'a> HummockVersionTransaction<'a> { pub(super) fn pre_commit_epoch( &mut self, tables_to_commit: &HashMap, - new_compaction_groups: HashMap>, + compaction_groups: HashMap)>, commit_sstables: BTreeMap>, new_table_ids: &HashMap, new_table_watermarks: HashMap, @@ -121,18 +121,22 @@ impl<'a> HummockVersionTransaction<'a> { new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; - for (compaction_group_id, compaction_group_config) in new_compaction_groups { + for (compaction_group_id, (is_new, compaction_group_config)) in &compaction_groups { { + if !is_new { + continue; + } + let group_deltas = &mut new_version_delta .group_deltas - .entry(compaction_group_id) + .entry(*compaction_group_id) .or_default() .group_deltas; #[expect(deprecated)] group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { - group_config: Some((*compaction_group_config).clone()), - group_id: compaction_group_id, + group_config: Some((**compaction_group_config).clone()), + group_id: *compaction_group_id, parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId, new_sst_start_id: 0, // No need to set it when `NewCompactionGroup` @@ -145,14 +149,41 @@ impl<'a> HummockVersionTransaction<'a> { // Append SSTs to a new version. for (compaction_group_id, inserted_table_infos) in commit_sstables { + let mut accumulated_size = 0; + let mut ssts = vec![]; + let (_, config) = compaction_groups + .get(&compaction_group_id) + .unwrap_or_else(|| { + panic!( + "compaction group {:?} not found in compaction_groups", + compaction_group_id + ) + }); + let sub_level_size_limit = config.sub_level_max_compaction_bytes * 2; + let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) .or_default() .group_deltas; - let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos); - group_deltas.push(group_delta); + for sst in inserted_table_infos { + accumulated_size += sst.sst_size; + ssts.push(sst); + if accumulated_size > sub_level_size_limit { + let group_delta = GroupDelta::NewL0SubLevel(ssts); + group_deltas.push(group_delta); + + // reset the accumulated size and ssts + accumulated_size = 0; + ssts = vec![]; + } + } + + if accumulated_size != 0 { + let group_delta = GroupDelta::NewL0SubLevel(ssts); + group_deltas.push(group_delta); + } } // update state table info diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 24ba1ca1cf779..4d3cb7c5ce2b5 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2826,3 +2826,129 @@ async fn test_commit_multi_epoch() { assert_eq!(info.committed_epoch, epoch3); } } + +#[tokio::test] +async fn test_commit_with_large_size() { + let test_env = prepare_hummock_test_env().await; + let context_id = test_env.meta_client.context_id(); + let existing_table_id = TableId::new(1); + let initial_epoch = INVALID_EPOCH; + + let commit_epoch = + |epoch, ssts: Vec, new_table_fragment_infos, tables_to_commit: &[TableId]| { + let manager = &test_env.manager; + let tables_to_commit = tables_to_commit + .iter() + .map(|table_id| (*table_id, epoch)) + .collect(); + let sst_to_context = ssts.iter().map(|sst| (sst.object_id, context_id)).collect(); + + let sstables = ssts + .into_iter() + .map(|sst| LocalSstableInfo { + table_stats: sst + .table_ids + .iter() + .map(|&table_id| { + ( + table_id, + TableStats { + total_compressed_size: 10, + ..Default::default() + }, + ) + }) + .collect(), + sst_info: sst, + created_at: u64::MAX, + }) + .collect_vec(); + + async move { + manager + .commit_epoch(CommitEpochInfo { + new_table_watermarks: Default::default(), + sst_to_context, + sstables, + new_table_fragment_infos, + change_log_delta: Default::default(), + tables_to_commit, + }) + .await + .unwrap(); + } + }; + + let epoch1 = initial_epoch.next_epoch(); + let sst1_epoch1 = SstableInfo { + sst_id: 11, + object_id: 1, + table_ids: vec![existing_table_id.table_id], + file_size: 512 << 20, + sst_size: 512 << 20, + ..Default::default() + }; + + let sst1_epoch2 = SstableInfo { + sst_id: 12, + object_id: 2, + table_ids: vec![existing_table_id.table_id], + file_size: 512 << 20, + sst_size: 512 << 20, + ..Default::default() + }; + + let sst1_epoch3 = SstableInfo { + sst_id: 13, + object_id: 3, + table_ids: vec![existing_table_id.table_id], + file_size: 512 << 20, + sst_size: 512 << 20, + ..Default::default() + }; + + commit_epoch( + epoch1, + vec![ + sst1_epoch1.clone(), + sst1_epoch2.clone(), + sst1_epoch3.clone(), + ], + vec![NewTableFragmentInfo { + table_ids: HashSet::from_iter([existing_table_id]), + }], + &[existing_table_id], + ) + .await; + + let cg_id = + get_compaction_group_id_by_table_id(test_env.manager.clone(), existing_table_id.table_id()) + .await; + + let l0_sub_levels = test_env + .manager + .get_current_version() + .await + .levels + .get(&cg_id) + .unwrap() + .l0 + .clone(); + + assert_eq!(3, l0_sub_levels.sub_levels.len()); + assert_eq!(1, l0_sub_levels.sub_levels[0].table_infos.len()); + assert_eq!( + sst1_epoch1.object_id, + l0_sub_levels.sub_levels[0].table_infos[0].object_id + ); + assert_eq!(1, l0_sub_levels.sub_levels[1].table_infos.len()); + assert_eq!( + sst1_epoch2.object_id, + l0_sub_levels.sub_levels[1].table_infos[0].object_id + ); + assert_eq!(1, l0_sub_levels.sub_levels[2].table_infos.len()); + assert_eq!( + sst1_epoch3.object_id, + l0_sub_levels.sub_levels[2].table_infos[0].object_id + ); +} From 579cf6d437b6fd6b36ff5fcc5babd90b704491cb Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 7 Nov 2024 13:51:16 +0800 Subject: [PATCH 2/9] change metrics bucket --- src/storage/src/monitor/monitored_storage_metrics.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage/src/monitor/monitored_storage_metrics.rs b/src/storage/src/monitor/monitored_storage_metrics.rs index 5813c20b4e9ef..abc95102e79c3 100644 --- a/src/storage/src/monitor/monitored_storage_metrics.rs +++ b/src/storage/src/monitor/monitored_storage_metrics.rs @@ -70,8 +70,8 @@ pub fn global_storage_metrics(metric_level: MetricLevel) -> MonitoredStorageMetr impl MonitoredStorageMetrics { pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self { - // 256B ~ max 4GB - let size_buckets = exponential_buckets(256.0, 16.0, 7).unwrap(); + // 256B ~ max 64GB + let size_buckets = exponential_buckets(256.0, 16.0, 8).unwrap(); // 10ms ~ max 2.7h let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap(); // ----- get ----- From 76494c1e92dc4b317527d40042c4307c3817774a Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 19 Nov 2024 15:36:19 +0800 Subject: [PATCH 3/9] refactor --- src/meta/src/hummock/manager/commit_epoch.rs | 96 ++++++++++++++------ src/meta/src/hummock/manager/transaction.rs | 81 +++++------------ 2 files changed, 91 insertions(+), 86 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 588aa7f6e33e7..09ac1bd271b3b 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -15,6 +15,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; +use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_hummock_sdk::change_log::ChangeLogDelta; @@ -112,7 +113,7 @@ impl HummockManager { let state_table_info = &version.latest_version().state_table_info; let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); let mut new_table_ids = HashMap::new(); - let mut compaction_groups = HashMap::new(); + let mut new_compaction_groups = Vec::new(); let mut compaction_group_manager_txn = None; let mut compaction_group_config: Option> = None; @@ -143,17 +144,13 @@ impl HummockManager { ) }; let new_compaction_group_id = next_compaction_group_id(&self.env).await?; - compaction_groups.insert( - new_compaction_group_id, - (true, compaction_group_config.clone()), - ); - compaction_group_manager.insert( - new_compaction_group_id, - CompactionGroup { - group_id: new_compaction_group_id, - compaction_config: compaction_group_config, - }, - ); + let new_compaction_group = CompactionGroup { + group_id: new_compaction_group_id, + compaction_config: compaction_group_config.clone(), + }; + + new_compaction_groups.push(new_compaction_group.clone()); + compaction_group_manager.insert(new_compaction_group_id, new_compaction_group); on_handle_add_new_table( state_table_info, @@ -168,35 +165,35 @@ impl HummockManager { .correct_commit_ssts(sstables, &table_compaction_group_mapping) .await?; - let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); + let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec(); // fill compaction_groups + let mut group_id_to_config = HashMap::new(); if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() { for cg_id in &modified_compaction_groups { - if !compaction_groups.contains_key(cg_id) { - let compaction_group = compaction_group_manager - .get(cg_id) - .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id)) - .compaction_config(); - compaction_groups.insert(*cg_id, (false, compaction_group)); - } + let compaction_group = compaction_group_manager + .get(cg_id) + .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id)) + .compaction_config(); + group_id_to_config.insert(*cg_id, compaction_group); } } else { let compaction_group_manager = self.compaction_group_manager.read().await; for cg_id in &modified_compaction_groups { - if !compaction_groups.contains_key(cg_id) { - let compaction_group = compaction_group_manager - .try_get_compaction_group_config(*cg_id) - .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id)) - .compaction_config(); - compaction_groups.insert(*cg_id, (false, compaction_group)); - } + let compaction_group = compaction_group_manager + .try_get_compaction_group_config(*cg_id) + .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id)) + .compaction_config(); + group_id_to_config.insert(*cg_id, compaction_group); } } + let group_id_to_sub_levels = + rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config); + let time_travel_delta = version.pre_commit_epoch( &tables_to_commit, - compaction_groups, - commit_sstables, + new_compaction_groups, + group_id_to_sub_levels, &new_table_ids, new_table_watermarks, change_log_delta, @@ -445,3 +442,44 @@ fn on_handle_add_new_table( Ok(()) } + +/// Rewrite the commit sstables to sub-levels based on the compaction group config. +/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead. +fn rewrite_commit_sstables_to_sub_level( + commit_sstables: BTreeMap>, + group_id_to_config: &HashMap>, +) -> BTreeMap>> { + let mut overlapping_sstables: BTreeMap>> = BTreeMap::new(); + for (group_id, inserted_table_infos) in commit_sstables { + let config = group_id_to_config + .get(&group_id) + .expect("compaction group should exist"); + + let mut accumulated_size = 0; + let mut ssts = vec![]; + let sub_level_size_limit = config.sub_level_max_compaction_bytes * 2; // TODO: use config instead of magic number + + let level = overlapping_sstables.entry(group_id).or_default(); + + for sst in inserted_table_infos { + accumulated_size += sst.sst_size; + ssts.push(sst); + if accumulated_size > sub_level_size_limit { + level.push(ssts); + + // reset the accumulated size and ssts + accumulated_size = 0; + ssts = vec![]; + } + } + + if accumulated_size != 0 { + level.push(ssts); + } + + // The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top. + level.reverse(); + } + + overlapping_sstables +} diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 2c439f7000d26..8a4276492365d 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -14,7 +14,6 @@ use std::collections::{BTreeMap, HashMap}; use std::ops::{Deref, DerefMut}; -use std::sync::Arc; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::ChangeLogDelta; @@ -24,11 +23,12 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks; use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId}; use risingwave_pb::hummock::{ - CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas, - HummockVersionStats, StateTableInfoDelta, + CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats, + StateTableInfoDelta, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; +use crate::hummock::model::CompactionGroup; use crate::manager::NotificationManager; use crate::model::{ InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction, @@ -111,8 +111,8 @@ impl<'a> HummockVersionTransaction<'a> { pub(super) fn pre_commit_epoch( &mut self, tables_to_commit: &HashMap, - compaction_groups: HashMap)>, - commit_sstables: BTreeMap>, + new_compaction_groups: Vec, + group_id_to_sub_levels: BTreeMap>>, new_table_ids: &HashMap, new_table_watermarks: HashMap, change_log_delta: HashMap, @@ -121,68 +121,35 @@ impl<'a> HummockVersionTransaction<'a> { new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; - for (compaction_group_id, (is_new, compaction_group_config)) in &compaction_groups { - { - if !is_new { - continue; - } + for compaction_group in &new_compaction_groups { + let group_deltas = &mut new_version_delta + .group_deltas + .entry(compaction_group.group_id()) + .or_default() + .group_deltas; - let group_deltas = &mut new_version_delta - .group_deltas - .entry(*compaction_group_id) - .or_default() - .group_deltas; - - #[expect(deprecated)] - group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { - group_config: Some((**compaction_group_config).clone()), - group_id: *compaction_group_id, - parent_group_id: StaticCompactionGroupId::NewCompactionGroup - as CompactionGroupId, - new_sst_start_id: 0, // No need to set it when `NewCompactionGroup` - table_ids: vec![], - version: CompatibilityVersion::SplitGroupByTableId as i32, - split_key: None, - })); - } + #[expect(deprecated)] + group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { + group_config: Some(compaction_group.compaction_config().as_ref().clone()), + group_id: compaction_group.group_id(), + parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId, + new_sst_start_id: 0, // No need to set it when `NewCompactionGroup` + table_ids: vec![], + version: CompatibilityVersion::SplitGroupByTableId as i32, + split_key: None, + })); } // Append SSTs to a new version. - for (compaction_group_id, inserted_table_infos) in commit_sstables { - let mut accumulated_size = 0; - let mut ssts = vec![]; - let (_, config) = compaction_groups - .get(&compaction_group_id) - .unwrap_or_else(|| { - panic!( - "compaction group {:?} not found in compaction_groups", - compaction_group_id - ) - }); - let sub_level_size_limit = config.sub_level_max_compaction_bytes * 2; - + for (compaction_group_id, sub_levels) in group_id_to_sub_levels { let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) .or_default() .group_deltas; - for sst in inserted_table_infos { - accumulated_size += sst.sst_size; - ssts.push(sst); - if accumulated_size > sub_level_size_limit { - let group_delta = GroupDelta::NewL0SubLevel(ssts); - group_deltas.push(group_delta); - - // reset the accumulated size and ssts - accumulated_size = 0; - ssts = vec![]; - } - } - - if accumulated_size != 0 { - let group_delta = GroupDelta::NewL0SubLevel(ssts); - group_deltas.push(group_delta); + for sub_level in sub_levels { + group_deltas.push(GroupDelta::NewL0SubLevel(sub_level)); } } From 868bef91bb4470bfa81a1518466c5253d74607ef Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 19 Nov 2024 16:50:11 +0800 Subject: [PATCH 4/9] feat(config): add config for calculate max_overlapping_level_size --- proto/hummock.proto | 6 ++++++ src/common/src/config.rs | 4 ++++ src/ctl/src/cmd_impl/hummock/compaction_group.rs | 4 ++++ src/ctl/src/lib.rs | 4 ++++ src/meta/src/hummock/compaction/compaction_config.rs | 3 +++ src/meta/src/hummock/manager/commit_epoch.rs | 7 ++++++- .../hummock/manager/compaction/compaction_group_manager.rs | 3 +++ 7 files changed, 30 insertions(+), 1 deletion(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 869c5af867d43..be4f3b666228b 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -660,6 +660,7 @@ message RiseCtlUpdateCompactionConfigRequest { uint64 sst_allowed_trivial_move_min_size = 19; uint32 split_weight_by_vnode = 20; bool disable_auto_group_scheduling = 21; + uint32 max_overlapping_level_size_ratio = 22; } } repeated uint64 compaction_group_ids = 1; @@ -858,6 +859,11 @@ message CompactionConfig { // The limitation of auto group scheduling optional bool disable_auto_group_scheduling = 23; + + // The limitation of the max size of the overlapping-level for the compaction + // e.g. max_overlapping_level_size = sub_level_max_compaction_bytes * max_overlapping_level_size_ratio + // hummock will reorg the commit-sstables to the multi overlapping-level if the size of the commit-sstables is larger than max_overlapping_level_size + optional uint32 max_overlapping_level_size_ratio = 24; } message TableStats { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 64d7675903ec4..844e198e95cfc 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -2230,6 +2230,10 @@ pub mod default { pub fn disable_auto_group_scheduling() -> bool { false } + + pub fn max_overlapping_level_size_ratio() -> u32 { + 4 + } } pub mod object_store_config { diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs index d109d6eabda67..6ec9577d6d2bc 100644 --- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs +++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs @@ -68,6 +68,7 @@ pub fn build_compaction_config_vec( max_l0_compact_level: Option, sst_allowed_trivial_move_min_size: Option, disable_auto_group_scheduling: Option, + max_overlapping_level_size_ratio: Option, ) -> Vec { let mut configs = vec![]; if let Some(c) = max_bytes_for_level_base { @@ -127,6 +128,9 @@ pub fn build_compaction_config_vec( if let Some(c) = disable_auto_group_scheduling { configs.push(MutableConfig::DisableAutoGroupScheduling(c)) } + if let Some(c) = max_overlapping_level_size_ratio { + configs.push(MutableConfig::MaxOverlappingLevelSizeRatio(c)) + } configs } diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 99de4cd9b17b9..952550e1bfcb4 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -191,6 +191,8 @@ enum HummockCommands { sst_allowed_trivial_move_min_size: Option, #[clap(long)] disable_auto_group_scheduling: Option, + #[clap(long)] + max_overlapping_level_size_ratio: Option, }, /// Split given compaction group into two. Moves the given tables to the new group. SplitCompactionGroup { @@ -578,6 +580,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { max_l0_compact_level, sst_allowed_trivial_move_min_size, disable_auto_group_scheduling, + max_overlapping_level_size_ratio, }) => { cmd_impl::hummock::update_compaction_config( context, @@ -610,6 +613,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { max_l0_compact_level, sst_allowed_trivial_move_min_size, disable_auto_group_scheduling, + max_overlapping_level_size_ratio, ), ) .await? diff --git a/src/meta/src/hummock/compaction/compaction_config.rs b/src/meta/src/hummock/compaction/compaction_config.rs index d7be9b6e6cbaa..97d14f1613751 100644 --- a/src/meta/src/hummock/compaction/compaction_config.rs +++ b/src/meta/src/hummock/compaction/compaction_config.rs @@ -71,6 +71,9 @@ impl CompactionConfigBuilder { disable_auto_group_scheduling: Some( compaction_config::disable_auto_group_scheduling(), ), + max_overlapping_level_size_ratio: Some( + compaction_config::max_overlapping_level_size_ratio(), + ), }, } } diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 09ac1bd271b3b..7891b6ea27577 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_common::catalog::TableId; +use risingwave_common::config::default::compaction_config; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_hummock_sdk::change_log::ChangeLogDelta; use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids; @@ -457,7 +458,11 @@ fn rewrite_commit_sstables_to_sub_level( let mut accumulated_size = 0; let mut ssts = vec![]; - let sub_level_size_limit = config.sub_level_max_compaction_bytes * 2; // TODO: use config instead of magic number + let sub_level_size_limit = config.sub_level_max_compaction_bytes + * config + .max_overlapping_level_size_ratio + .unwrap_or(compaction_config::max_overlapping_level_size_ratio()) + as u64; let level = overlapping_sstables.entry(group_id).or_default(); diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index 02b63ab47de62..360931d866b0a 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -588,6 +588,9 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi MutableConfig::DisableAutoGroupScheduling(c) => { target.disable_auto_group_scheduling = Some(*c); } + MutableConfig::MaxOverlappingLevelSizeRatio(c) => { + target.max_overlapping_level_size_ratio = Some(*c); + } } } } From 93df8818f0c04c209c8bbcedb3745f9c8a437762 Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 19 Nov 2024 17:45:21 +0800 Subject: [PATCH 5/9] refactor --- proto/hummock.proto | 7 +++---- src/common/src/config.rs | 2 +- src/ctl/src/cmd_impl/hummock/compaction_group.rs | 6 +++--- src/ctl/src/lib.rs | 7 ++++--- src/meta/src/hummock/compaction/compaction_config.rs | 4 +--- src/meta/src/hummock/manager/commit_epoch.rs | 9 ++++----- .../manager/compaction/compaction_group_manager.rs | 4 ++-- src/storage/hummock_test/src/hummock_storage_tests.rs | 5 +++-- 8 files changed, 21 insertions(+), 23 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index be4f3b666228b..446182c72044c 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -660,7 +660,7 @@ message RiseCtlUpdateCompactionConfigRequest { uint64 sst_allowed_trivial_move_min_size = 19; uint32 split_weight_by_vnode = 20; bool disable_auto_group_scheduling = 21; - uint32 max_overlapping_level_size_ratio = 22; + uint32 max_overlapping_level_size = 22; } } repeated uint64 compaction_group_ids = 1; @@ -861,9 +861,8 @@ message CompactionConfig { optional bool disable_auto_group_scheduling = 23; // The limitation of the max size of the overlapping-level for the compaction - // e.g. max_overlapping_level_size = sub_level_max_compaction_bytes * max_overlapping_level_size_ratio - // hummock will reorg the commit-sstables to the multi overlapping-level if the size of the commit-sstables is larger than max_overlapping_level_size - optional uint32 max_overlapping_level_size_ratio = 24; + // hummock will reorg the commit-sstables to the multi overlapping-level if the size of the commit-sstables is larger than `max_overlapping_level_size` + optional uint32 max_overlapping_level_size = 24; } message TableStats { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 844e198e95cfc..6095053ccfe2e 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -2231,7 +2231,7 @@ pub mod default { false } - pub fn max_overlapping_level_size_ratio() -> u32 { + pub fn max_overlapping_level_size() -> u32 { 4 } } diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs index 6ec9577d6d2bc..7b4160fd6ad72 100644 --- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs +++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs @@ -68,7 +68,7 @@ pub fn build_compaction_config_vec( max_l0_compact_level: Option, sst_allowed_trivial_move_min_size: Option, disable_auto_group_scheduling: Option, - max_overlapping_level_size_ratio: Option, + max_overlapping_level_size: Option, ) -> Vec { let mut configs = vec![]; if let Some(c) = max_bytes_for_level_base { @@ -128,8 +128,8 @@ pub fn build_compaction_config_vec( if let Some(c) = disable_auto_group_scheduling { configs.push(MutableConfig::DisableAutoGroupScheduling(c)) } - if let Some(c) = max_overlapping_level_size_ratio { - configs.push(MutableConfig::MaxOverlappingLevelSizeRatio(c)) + if let Some(c) = max_overlapping_level_size { + configs.push(MutableConfig::MaxOverlappingLevelSize(c)) } configs diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 952550e1bfcb4..c09d470a9b16e 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -85,6 +85,7 @@ enum ComputeCommands { ShowConfig { host: String }, } +#[allow(clippy::large_enum_variant)] #[derive(Subcommand)] enum HummockCommands { /// list latest Hummock version on meta node @@ -192,7 +193,7 @@ enum HummockCommands { #[clap(long)] disable_auto_group_scheduling: Option, #[clap(long)] - max_overlapping_level_size_ratio: Option, + max_overlapping_level_size: Option, }, /// Split given compaction group into two. Moves the given tables to the new group. SplitCompactionGroup { @@ -580,7 +581,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { max_l0_compact_level, sst_allowed_trivial_move_min_size, disable_auto_group_scheduling, - max_overlapping_level_size_ratio, + max_overlapping_level_size, }) => { cmd_impl::hummock::update_compaction_config( context, @@ -613,7 +614,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { max_l0_compact_level, sst_allowed_trivial_move_min_size, disable_auto_group_scheduling, - max_overlapping_level_size_ratio, + max_overlapping_level_size, ), ) .await? diff --git a/src/meta/src/hummock/compaction/compaction_config.rs b/src/meta/src/hummock/compaction/compaction_config.rs index 97d14f1613751..c808c2f548023 100644 --- a/src/meta/src/hummock/compaction/compaction_config.rs +++ b/src/meta/src/hummock/compaction/compaction_config.rs @@ -71,9 +71,7 @@ impl CompactionConfigBuilder { disable_auto_group_scheduling: Some( compaction_config::disable_auto_group_scheduling(), ), - max_overlapping_level_size_ratio: Some( - compaction_config::max_overlapping_level_size_ratio(), - ), + max_overlapping_level_size: Some(compaction_config::max_overlapping_level_size()), }, } } diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 7891b6ea27577..ef8f9aa0258c9 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -458,11 +458,10 @@ fn rewrite_commit_sstables_to_sub_level( let mut accumulated_size = 0; let mut ssts = vec![]; - let sub_level_size_limit = config.sub_level_max_compaction_bytes - * config - .max_overlapping_level_size_ratio - .unwrap_or(compaction_config::max_overlapping_level_size_ratio()) - as u64; + let sub_level_size_limit = config + .max_overlapping_level_size + .unwrap_or(compaction_config::max_overlapping_level_size()) + as u64; let level = overlapping_sstables.entry(group_id).or_default(); diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index 360931d866b0a..1281464bb8fce 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -588,8 +588,8 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi MutableConfig::DisableAutoGroupScheduling(c) => { target.disable_auto_group_scheduling = Some(*c); } - MutableConfig::MaxOverlappingLevelSizeRatio(c) => { - target.max_overlapping_level_size_ratio = Some(*c); + MutableConfig::MaxOverlappingLevelSize(c) => { + target.max_overlapping_level_size = Some(*c); } } } diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 8e8b07534b591..0b216e84c4960 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2950,9 +2950,9 @@ async fn test_commit_with_large_size() { commit_epoch( epoch1, vec![ - sst1_epoch1.clone(), - sst1_epoch2.clone(), sst1_epoch3.clone(), + sst1_epoch2.clone(), + sst1_epoch1.clone(), ], vec![NewTableFragmentInfo { table_ids: HashSet::from_iter([existing_table_id]), @@ -2975,6 +2975,7 @@ async fn test_commit_with_large_size() { .l0 .clone(); + println!("l0_sub_levels {:?}", l0_sub_levels.sub_levels); assert_eq!(3, l0_sub_levels.sub_levels.len()); assert_eq!(1, l0_sub_levels.sub_levels[0].table_infos.len()); assert_eq!( From 2c5eee4fbd91c9f19319da282dca486d066ca865 Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 19 Nov 2024 18:12:13 +0800 Subject: [PATCH 6/9] refactor --- proto/hummock.proto | 4 ++-- src/common/src/config.rs | 4 ++-- src/ctl/src/cmd_impl/hummock/compaction_group.rs | 2 +- src/ctl/src/lib.rs | 2 +- src/meta/src/hummock/manager/commit_epoch.rs | 3 +-- 5 files changed, 7 insertions(+), 8 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 446182c72044c..15f3d61a7cf2b 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -660,7 +660,7 @@ message RiseCtlUpdateCompactionConfigRequest { uint64 sst_allowed_trivial_move_min_size = 19; uint32 split_weight_by_vnode = 20; bool disable_auto_group_scheduling = 21; - uint32 max_overlapping_level_size = 22; + uint64 max_overlapping_level_size = 22; } } repeated uint64 compaction_group_ids = 1; @@ -862,7 +862,7 @@ message CompactionConfig { // The limitation of the max size of the overlapping-level for the compaction // hummock will reorg the commit-sstables to the multi overlapping-level if the size of the commit-sstables is larger than `max_overlapping_level_size` - optional uint32 max_overlapping_level_size = 24; + optional uint64 max_overlapping_level_size = 24; } message TableStats { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 6095053ccfe2e..a083be1cd6f77 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -2231,8 +2231,8 @@ pub mod default { false } - pub fn max_overlapping_level_size() -> u32 { - 4 + pub fn max_overlapping_level_size() -> u64 { + DEFAULT_MIN_COMPACTION_BYTES } } diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs index 7b4160fd6ad72..e164c0b060eb0 100644 --- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs +++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs @@ -68,7 +68,7 @@ pub fn build_compaction_config_vec( max_l0_compact_level: Option, sst_allowed_trivial_move_min_size: Option, disable_auto_group_scheduling: Option, - max_overlapping_level_size: Option, + max_overlapping_level_size: Option, ) -> Vec { let mut configs = vec![]; if let Some(c) = max_bytes_for_level_base { diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index c09d470a9b16e..c13e83cb62b00 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -193,7 +193,7 @@ enum HummockCommands { #[clap(long)] disable_auto_group_scheduling: Option, #[clap(long)] - max_overlapping_level_size: Option, + max_overlapping_level_size: Option, }, /// Split given compaction group into two. Moves the given tables to the new group. SplitCompactionGroup { diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index ef8f9aa0258c9..e84d9373713d0 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -460,8 +460,7 @@ fn rewrite_commit_sstables_to_sub_level( let mut ssts = vec![]; let sub_level_size_limit = config .max_overlapping_level_size - .unwrap_or(compaction_config::max_overlapping_level_size()) - as u64; + .unwrap_or(compaction_config::max_overlapping_level_size()); let level = overlapping_sstables.entry(group_id).or_default(); From c6a4a0a395186e5edd7c136b8732f4b4f9466ea4 Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 20 Nov 2024 17:50:24 +0800 Subject: [PATCH 7/9] address comments --- src/common/src/config.rs | 2 +- src/meta/src/hummock/manager/commit_epoch.rs | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index a083be1cd6f77..4ef1edad8316e 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -2232,7 +2232,7 @@ pub mod default { } pub fn max_overlapping_level_size() -> u64 { - DEFAULT_MIN_COMPACTION_BYTES + 256 * MB } } diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index e84d9373713d0..b54f3c2f42555 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -351,6 +351,7 @@ impl HummockManager { ) -> Result>> { let mut new_sst_id_number = 0; let mut sst_to_cg_vec = Vec::with_capacity(sstables.len()); + let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec(); for commit_sst in sstables { let mut group_table_ids: BTreeMap> = BTreeMap::new(); for table_id in &commit_sst.sst_info.table_ids { @@ -419,6 +420,12 @@ impl HummockManager { } } + // order check + for ssts in commit_sstables.values() { + let object_ids = ssts.iter().map(|s| s.object_id).collect_vec(); + assert!(is_ordered_subset(&commit_object_id_vec, &object_ids)); + } + Ok(commit_sstables) } } @@ -486,3 +493,16 @@ fn rewrite_commit_sstables_to_sub_level( overlapping_sstables } + +fn is_ordered_subset(vec_1: &Vec, vec_2: &Vec) -> bool { + let mut vec_2_iter = vec_2.iter().peekable(); + for item in vec_1 { + if vec_2_iter.peek() == Some(&item) { + vec_2_iter.next(); + } else { + return false; + } + } + + vec_2_iter.peek().is_none() +} From e4bc0a7011d5c9d2b6301c2aa06ef2ba1045b88f Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 20 Nov 2024 18:09:39 +0800 Subject: [PATCH 8/9] fix --- src/meta/src/hummock/manager/commit_epoch.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index b54f3c2f42555..8dfc06312d433 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -499,8 +499,6 @@ fn is_ordered_subset(vec_1: &Vec, vec_2: &Vec) -> bool { for item in vec_1 { if vec_2_iter.peek() == Some(&item) { vec_2_iter.next(); - } else { - return false; } } From e840af8b20b423827560245a8a16fbf8b71f8bb5 Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 22 Nov 2024 18:02:34 +0800 Subject: [PATCH 9/9] address comments --- src/meta/src/hummock/manager/commit_epoch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 8dfc06312d433..67152cba14236 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -483,7 +483,7 @@ fn rewrite_commit_sstables_to_sub_level( } } - if accumulated_size != 0 { + if !ssts.is_empty() { level.push(ssts); }