diff --git a/proto/hummock.proto b/proto/hummock.proto index 869c5af867d43..15f3d61a7cf2b 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -660,6 +660,7 @@ message RiseCtlUpdateCompactionConfigRequest { uint64 sst_allowed_trivial_move_min_size = 19; uint32 split_weight_by_vnode = 20; bool disable_auto_group_scheduling = 21; + uint64 max_overlapping_level_size = 22; } } repeated uint64 compaction_group_ids = 1; @@ -858,6 +859,10 @@ message CompactionConfig { // The limitation of auto group scheduling optional bool disable_auto_group_scheduling = 23; + + // The limitation of the max size of the overlapping-level for the compaction + // hummock will reorg the commit-sstables to the multi overlapping-level if the size of the commit-sstables is larger than `max_overlapping_level_size` + optional uint64 max_overlapping_level_size = 24; } message TableStats { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 1f67057801c4f..393a3a05acb4d 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -2237,6 +2237,10 @@ pub mod default { pub fn disable_auto_group_scheduling() -> bool { false } + + pub fn max_overlapping_level_size() -> u64 { + 256 * MB + } } pub mod object_store_config { diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs index d109d6eabda67..e164c0b060eb0 100644 --- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs +++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs @@ -68,6 +68,7 @@ pub fn build_compaction_config_vec( max_l0_compact_level: Option, sst_allowed_trivial_move_min_size: Option, disable_auto_group_scheduling: Option, + max_overlapping_level_size: Option, ) -> Vec { let mut configs = vec![]; if let Some(c) = max_bytes_for_level_base { @@ -127,6 +128,9 @@ pub fn build_compaction_config_vec( if let Some(c) = disable_auto_group_scheduling { configs.push(MutableConfig::DisableAutoGroupScheduling(c)) } + if let Some(c) = max_overlapping_level_size { + configs.push(MutableConfig::MaxOverlappingLevelSize(c)) + } configs } diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 99de4cd9b17b9..c13e83cb62b00 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -85,6 +85,7 @@ enum ComputeCommands { ShowConfig { host: String }, } +#[allow(clippy::large_enum_variant)] #[derive(Subcommand)] enum HummockCommands { /// list latest Hummock version on meta node @@ -191,6 +192,8 @@ enum HummockCommands { sst_allowed_trivial_move_min_size: Option, #[clap(long)] disable_auto_group_scheduling: Option, + #[clap(long)] + max_overlapping_level_size: Option, }, /// Split given compaction group into two. Moves the given tables to the new group. SplitCompactionGroup { @@ -578,6 +581,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { max_l0_compact_level, sst_allowed_trivial_move_min_size, disable_auto_group_scheduling, + max_overlapping_level_size, }) => { cmd_impl::hummock::update_compaction_config( context, @@ -610,6 +614,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { max_l0_compact_level, sst_allowed_trivial_move_min_size, disable_auto_group_scheduling, + max_overlapping_level_size, ), ) .await? diff --git a/src/meta/src/hummock/compaction/compaction_config.rs b/src/meta/src/hummock/compaction/compaction_config.rs index d7be9b6e6cbaa..c808c2f548023 100644 --- a/src/meta/src/hummock/compaction/compaction_config.rs +++ b/src/meta/src/hummock/compaction/compaction_config.rs @@ -71,6 +71,7 @@ impl CompactionConfigBuilder { disable_auto_group_scheduling: Some( compaction_config::disable_auto_group_scheduling(), ), + max_overlapping_level_size: Some(compaction_config::max_overlapping_level_size()), }, } } diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index c51c77a5d36a0..67152cba14236 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -15,7 +15,9 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; +use itertools::Itertools; use risingwave_common::catalog::TableId; +use risingwave_common::config::default::compaction_config; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_hummock_sdk::change_log::ChangeLogDelta; use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids; @@ -112,7 +114,7 @@ impl HummockManager { let state_table_info = &version.latest_version().state_table_info; let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id(); let mut new_table_ids = HashMap::new(); - let mut new_compaction_groups = HashMap::new(); + let mut new_compaction_groups = Vec::new(); let mut compaction_group_manager_txn = None; let mut compaction_group_config: Option> = None; @@ -143,14 +145,13 @@ impl HummockManager { ) }; let new_compaction_group_id = next_compaction_group_id(&self.env).await?; - new_compaction_groups.insert(new_compaction_group_id, compaction_group_config.clone()); - compaction_group_manager.insert( - new_compaction_group_id, - CompactionGroup { - group_id: new_compaction_group_id, - compaction_config: compaction_group_config, - }, - ); + let new_compaction_group = CompactionGroup { + group_id: new_compaction_group_id, + compaction_config: compaction_group_config.clone(), + }; + + new_compaction_groups.push(new_compaction_group.clone()); + compaction_group_manager.insert(new_compaction_group_id, new_compaction_group); on_handle_add_new_table( state_table_info, @@ -165,12 +166,35 @@ impl HummockManager { .correct_commit_ssts(sstables, &table_compaction_group_mapping) .await?; - let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); + let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec(); + // fill compaction_groups + let mut group_id_to_config = HashMap::new(); + if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() { + for cg_id in &modified_compaction_groups { + let compaction_group = compaction_group_manager + .get(cg_id) + .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id)) + .compaction_config(); + group_id_to_config.insert(*cg_id, compaction_group); + } + } else { + let compaction_group_manager = self.compaction_group_manager.read().await; + for cg_id in &modified_compaction_groups { + let compaction_group = compaction_group_manager + .try_get_compaction_group_config(*cg_id) + .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id)) + .compaction_config(); + group_id_to_config.insert(*cg_id, compaction_group); + } + } + + let group_id_to_sub_levels = + rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config); let time_travel_delta = version.pre_commit_epoch( &tables_to_commit, new_compaction_groups, - commit_sstables, + group_id_to_sub_levels, &new_table_ids, new_table_watermarks, change_log_delta, @@ -327,6 +351,7 @@ impl HummockManager { ) -> Result>> { let mut new_sst_id_number = 0; let mut sst_to_cg_vec = Vec::with_capacity(sstables.len()); + let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec(); for commit_sst in sstables { let mut group_table_ids: BTreeMap> = BTreeMap::new(); for table_id in &commit_sst.sst_info.table_ids { @@ -395,6 +420,12 @@ impl HummockManager { } } + // order check + for ssts in commit_sstables.values() { + let object_ids = ssts.iter().map(|s| s.object_id).collect_vec(); + assert!(is_ordered_subset(&commit_object_id_vec, &object_ids)); + } + Ok(commit_sstables) } } @@ -419,3 +450,57 @@ fn on_handle_add_new_table( Ok(()) } + +/// Rewrite the commit sstables to sub-levels based on the compaction group config. +/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead. +fn rewrite_commit_sstables_to_sub_level( + commit_sstables: BTreeMap>, + group_id_to_config: &HashMap>, +) -> BTreeMap>> { + let mut overlapping_sstables: BTreeMap>> = BTreeMap::new(); + for (group_id, inserted_table_infos) in commit_sstables { + let config = group_id_to_config + .get(&group_id) + .expect("compaction group should exist"); + + let mut accumulated_size = 0; + let mut ssts = vec![]; + let sub_level_size_limit = config + .max_overlapping_level_size + .unwrap_or(compaction_config::max_overlapping_level_size()); + + let level = overlapping_sstables.entry(group_id).or_default(); + + for sst in inserted_table_infos { + accumulated_size += sst.sst_size; + ssts.push(sst); + if accumulated_size > sub_level_size_limit { + level.push(ssts); + + // reset the accumulated size and ssts + accumulated_size = 0; + ssts = vec![]; + } + } + + if !ssts.is_empty() { + level.push(ssts); + } + + // The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top. + level.reverse(); + } + + overlapping_sstables +} + +fn is_ordered_subset(vec_1: &Vec, vec_2: &Vec) -> bool { + let mut vec_2_iter = vec_2.iter().peekable(); + for item in vec_1 { + if vec_2_iter.peek() == Some(&item) { + vec_2_iter.next(); + } + } + + vec_2_iter.peek().is_none() +} diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs index 18bb8dfaf86b3..3a6c179c03147 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs @@ -588,6 +588,9 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi MutableConfig::DisableAutoGroupScheduling(c) => { target.disable_auto_group_scheduling = Some(*c); } + MutableConfig::MaxOverlappingLevelSize(c) => { + target.max_overlapping_level_size = Some(*c); + } } } } diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 054ae657d594d..8a4276492365d 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -14,7 +14,6 @@ use std::collections::{BTreeMap, HashMap}; use std::ops::{Deref, DerefMut}; -use std::sync::Arc; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::ChangeLogDelta; @@ -24,11 +23,12 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks; use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId}; use risingwave_pb::hummock::{ - CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas, - HummockVersionStats, StateTableInfoDelta, + CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats, + StateTableInfoDelta, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; +use crate::hummock::model::CompactionGroup; use crate::manager::NotificationManager; use crate::model::{ InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction, @@ -111,8 +111,8 @@ impl<'a> HummockVersionTransaction<'a> { pub(super) fn pre_commit_epoch( &mut self, tables_to_commit: &HashMap, - new_compaction_groups: HashMap>, - commit_sstables: BTreeMap>, + new_compaction_groups: Vec, + group_id_to_sub_levels: BTreeMap>>, new_table_ids: &HashMap, new_table_watermarks: HashMap, change_log_delta: HashMap, @@ -121,38 +121,36 @@ impl<'a> HummockVersionTransaction<'a> { new_version_delta.new_table_watermarks = new_table_watermarks; new_version_delta.change_log_delta = change_log_delta; - for (compaction_group_id, compaction_group_config) in new_compaction_groups { - { - let group_deltas = &mut new_version_delta - .group_deltas - .entry(compaction_group_id) - .or_default() - .group_deltas; - - #[expect(deprecated)] - group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { - group_config: Some((*compaction_group_config).clone()), - group_id: compaction_group_id, - parent_group_id: StaticCompactionGroupId::NewCompactionGroup - as CompactionGroupId, - new_sst_start_id: 0, // No need to set it when `NewCompactionGroup` - table_ids: vec![], - version: CompatibilityVersion::SplitGroupByTableId as i32, - split_key: None, - })); - } + for compaction_group in &new_compaction_groups { + let group_deltas = &mut new_version_delta + .group_deltas + .entry(compaction_group.group_id()) + .or_default() + .group_deltas; + + #[expect(deprecated)] + group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct { + group_config: Some(compaction_group.compaction_config().as_ref().clone()), + group_id: compaction_group.group_id(), + parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId, + new_sst_start_id: 0, // No need to set it when `NewCompactionGroup` + table_ids: vec![], + version: CompatibilityVersion::SplitGroupByTableId as i32, + split_key: None, + })); } // Append SSTs to a new version. - for (compaction_group_id, inserted_table_infos) in commit_sstables { + for (compaction_group_id, sub_levels) in group_id_to_sub_levels { let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) .or_default() .group_deltas; - let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos); - group_deltas.push(group_delta); + for sub_level in sub_levels { + group_deltas.push(GroupDelta::NewL0SubLevel(sub_level)); + } } // update state table info diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 7e847fc089aa2..0b216e84c4960 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2866,3 +2866,130 @@ async fn test_commit_multi_epoch() { assert_eq!(info.committed_epoch, epoch3); } } + +#[tokio::test] +async fn test_commit_with_large_size() { + let test_env = prepare_hummock_test_env().await; + let context_id = test_env.meta_client.context_id(); + let existing_table_id = TableId::new(1); + let initial_epoch = INVALID_EPOCH; + + let commit_epoch = + |epoch, ssts: Vec, new_table_fragment_infos, tables_to_commit: &[TableId]| { + let manager = &test_env.manager; + let tables_to_commit = tables_to_commit + .iter() + .map(|table_id| (*table_id, epoch)) + .collect(); + let sst_to_context = ssts.iter().map(|sst| (sst.object_id, context_id)).collect(); + + let sstables = ssts + .into_iter() + .map(|sst| LocalSstableInfo { + table_stats: sst + .table_ids + .iter() + .map(|&table_id| { + ( + table_id, + TableStats { + total_compressed_size: 10, + ..Default::default() + }, + ) + }) + .collect(), + sst_info: sst, + created_at: u64::MAX, + }) + .collect_vec(); + + async move { + manager + .commit_epoch(CommitEpochInfo { + new_table_watermarks: Default::default(), + sst_to_context, + sstables, + new_table_fragment_infos, + change_log_delta: Default::default(), + tables_to_commit, + }) + .await + .unwrap(); + } + }; + + let epoch1 = initial_epoch.next_epoch(); + let sst1_epoch1 = SstableInfo { + sst_id: 11, + object_id: 1, + table_ids: vec![existing_table_id.table_id], + file_size: 512 << 20, + sst_size: 512 << 20, + ..Default::default() + }; + + let sst1_epoch2 = SstableInfo { + sst_id: 12, + object_id: 2, + table_ids: vec![existing_table_id.table_id], + file_size: 512 << 20, + sst_size: 512 << 20, + ..Default::default() + }; + + let sst1_epoch3 = SstableInfo { + sst_id: 13, + object_id: 3, + table_ids: vec![existing_table_id.table_id], + file_size: 512 << 20, + sst_size: 512 << 20, + ..Default::default() + }; + + commit_epoch( + epoch1, + vec![ + sst1_epoch3.clone(), + sst1_epoch2.clone(), + sst1_epoch1.clone(), + ], + vec![NewTableFragmentInfo { + table_ids: HashSet::from_iter([existing_table_id]), + }], + &[existing_table_id], + ) + .await; + + let cg_id = + get_compaction_group_id_by_table_id(test_env.manager.clone(), existing_table_id.table_id()) + .await; + + let l0_sub_levels = test_env + .manager + .get_current_version() + .await + .levels + .get(&cg_id) + .unwrap() + .l0 + .clone(); + + println!("l0_sub_levels {:?}", l0_sub_levels.sub_levels); + assert_eq!(3, l0_sub_levels.sub_levels.len()); + assert_eq!(1, l0_sub_levels.sub_levels[0].table_infos.len()); + assert_eq!( + sst1_epoch1.object_id, + l0_sub_levels.sub_levels[0].table_infos[0].object_id + ); + assert_eq!(1, l0_sub_levels.sub_levels[1].table_infos.len()); + assert_eq!( + sst1_epoch2.object_id, + l0_sub_levels.sub_levels[1].table_infos[0].object_id + ); + assert_eq!(1, l0_sub_levels.sub_levels[2].table_infos.len()); + assert_eq!( + sst1_epoch3.object_id, + l0_sub_levels.sub_levels[2].table_infos[0].object_id + ); +} diff --git a/src/storage/src/monitor/monitored_storage_metrics.rs b/src/storage/src/monitor/monitored_storage_metrics.rs index 8bd7ef64b6b83..f8e6ee1e24418 100644 --- a/src/storage/src/monitor/monitored_storage_metrics.rs +++ b/src/storage/src/monitor/monitored_storage_metrics.rs @@ -70,8 +70,8 @@ pub fn global_storage_metrics(metric_level: MetricLevel) -> MonitoredStorageMetr impl MonitoredStorageMetrics { pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self { - // 256B ~ max 4GB - let size_buckets = exponential_buckets(256.0, 16.0, 7).unwrap(); + // 256B ~ max 64GB + let size_buckets = exponential_buckets(256.0, 16.0, 8).unwrap(); // 10ms ~ max 2.7h let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap(); // ----- get -----