From bc06ffdb782ea5a9f11d19825f0f61c1723991c1 Mon Sep 17 00:00:00 2001
From: Li0k <yuli@singularity-data.com>
Date: Fri, 22 Nov 2024 18:44:33 +0800
Subject: [PATCH] feat(compaction): Limit the size of the new overlapping level
 (#19277)

---
 proto/hummock.proto                           |   5 +
 src/common/src/config.rs                      |   4 +
 .../src/cmd_impl/hummock/compaction_group.rs  |   4 +
 src/ctl/src/lib.rs                            |   5 +
 .../hummock/compaction/compaction_config.rs   |   1 +
 src/meta/src/hummock/manager/commit_epoch.rs  | 107 +++++++++++++--
 .../compaction/compaction_group_manager.rs    |   3 +
 src/meta/src/hummock/manager/transaction.rs   |  54 ++++----
 .../hummock_test/src/hummock_storage_tests.rs | 127 ++++++++++++++++++
 .../src/monitor/monitored_storage_metrics.rs  |   4 +-
 10 files changed, 273 insertions(+), 41 deletions(-)

diff --git a/proto/hummock.proto b/proto/hummock.proto
index 869c5af867d43..15f3d61a7cf2b 100644
--- a/proto/hummock.proto
+++ b/proto/hummock.proto
@@ -660,6 +660,7 @@ message RiseCtlUpdateCompactionConfigRequest {
       uint64 sst_allowed_trivial_move_min_size = 19;
       uint32 split_weight_by_vnode = 20;
       bool disable_auto_group_scheduling = 21;
+      uint64 max_overlapping_level_size = 22;
     }
   }
   repeated uint64 compaction_group_ids = 1;
@@ -858,6 +859,10 @@ message CompactionConfig {
 
   // The limitation of auto group scheduling
   optional bool disable_auto_group_scheduling = 23;
+
+  // The limitation of the max size of the overlapping-level for the compaction
+  // hummock will reorg the commit-sstables to the multi overlapping-level if the size of the commit-sstables is larger than `max_overlapping_level_size`
+  optional uint64 max_overlapping_level_size = 24;
 }
 
 message TableStats {
diff --git a/src/common/src/config.rs b/src/common/src/config.rs
index 1f67057801c4f..393a3a05acb4d 100644
--- a/src/common/src/config.rs
+++ b/src/common/src/config.rs
@@ -2237,6 +2237,10 @@ pub mod default {
         pub fn disable_auto_group_scheduling() -> bool {
             false
         }
+
+        pub fn max_overlapping_level_size() -> u64 {
+            256 * MB
+        }
     }
 
     pub mod object_store_config {
diff --git a/src/ctl/src/cmd_impl/hummock/compaction_group.rs b/src/ctl/src/cmd_impl/hummock/compaction_group.rs
index d109d6eabda67..e164c0b060eb0 100644
--- a/src/ctl/src/cmd_impl/hummock/compaction_group.rs
+++ b/src/ctl/src/cmd_impl/hummock/compaction_group.rs
@@ -68,6 +68,7 @@ pub fn build_compaction_config_vec(
     max_l0_compact_level: Option<u32>,
     sst_allowed_trivial_move_min_size: Option<u64>,
     disable_auto_group_scheduling: Option<bool>,
+    max_overlapping_level_size: Option<u64>,
 ) -> Vec<MutableConfig> {
     let mut configs = vec![];
     if let Some(c) = max_bytes_for_level_base {
@@ -127,6 +128,9 @@ pub fn build_compaction_config_vec(
     if let Some(c) = disable_auto_group_scheduling {
         configs.push(MutableConfig::DisableAutoGroupScheduling(c))
     }
+    if let Some(c) = max_overlapping_level_size {
+        configs.push(MutableConfig::MaxOverlappingLevelSize(c))
+    }
 
     configs
 }
diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs
index 99de4cd9b17b9..c13e83cb62b00 100644
--- a/src/ctl/src/lib.rs
+++ b/src/ctl/src/lib.rs
@@ -85,6 +85,7 @@ enum ComputeCommands {
     ShowConfig { host: String },
 }
 
+#[allow(clippy::large_enum_variant)]
 #[derive(Subcommand)]
 enum HummockCommands {
     /// list latest Hummock version on meta node
@@ -191,6 +192,8 @@ enum HummockCommands {
         sst_allowed_trivial_move_min_size: Option<u64>,
         #[clap(long)]
         disable_auto_group_scheduling: Option<bool>,
+        #[clap(long)]
+        max_overlapping_level_size: Option<u64>,
     },
     /// Split given compaction group into two. Moves the given tables to the new group.
     SplitCompactionGroup {
@@ -578,6 +581,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
             max_l0_compact_level,
             sst_allowed_trivial_move_min_size,
             disable_auto_group_scheduling,
+            max_overlapping_level_size,
         }) => {
             cmd_impl::hummock::update_compaction_config(
                 context,
@@ -610,6 +614,7 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
                     max_l0_compact_level,
                     sst_allowed_trivial_move_min_size,
                     disable_auto_group_scheduling,
+                    max_overlapping_level_size,
                 ),
             )
             .await?
diff --git a/src/meta/src/hummock/compaction/compaction_config.rs b/src/meta/src/hummock/compaction/compaction_config.rs
index d7be9b6e6cbaa..c808c2f548023 100644
--- a/src/meta/src/hummock/compaction/compaction_config.rs
+++ b/src/meta/src/hummock/compaction/compaction_config.rs
@@ -71,6 +71,7 @@ impl CompactionConfigBuilder {
                 disable_auto_group_scheduling: Some(
                     compaction_config::disable_auto_group_scheduling(),
                 ),
+                max_overlapping_level_size: Some(compaction_config::max_overlapping_level_size()),
             },
         }
     }
diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs
index c51c77a5d36a0..67152cba14236 100644
--- a/src/meta/src/hummock/manager/commit_epoch.rs
+++ b/src/meta/src/hummock/manager/commit_epoch.rs
@@ -15,7 +15,9 @@
 use std::collections::{BTreeMap, HashMap, HashSet};
 use std::sync::Arc;
 
+use itertools::Itertools;
 use risingwave_common::catalog::TableId;
+use risingwave_common::config::default::compaction_config;
 use risingwave_common::system_param::reader::SystemParamsRead;
 use risingwave_hummock_sdk::change_log::ChangeLogDelta;
 use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
@@ -112,7 +114,7 @@ impl HummockManager {
         let state_table_info = &version.latest_version().state_table_info;
         let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
         let mut new_table_ids = HashMap::new();
-        let mut new_compaction_groups = HashMap::new();
+        let mut new_compaction_groups = Vec::new();
         let mut compaction_group_manager_txn = None;
         let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;
 
@@ -143,14 +145,13 @@ impl HummockManager {
                     )
                 };
             let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
-            new_compaction_groups.insert(new_compaction_group_id, compaction_group_config.clone());
-            compaction_group_manager.insert(
-                new_compaction_group_id,
-                CompactionGroup {
-                    group_id: new_compaction_group_id,
-                    compaction_config: compaction_group_config,
-                },
-            );
+            let new_compaction_group = CompactionGroup {
+                group_id: new_compaction_group_id,
+                compaction_config: compaction_group_config.clone(),
+            };
+
+            new_compaction_groups.push(new_compaction_group.clone());
+            compaction_group_manager.insert(new_compaction_group_id, new_compaction_group);
 
             on_handle_add_new_table(
                 state_table_info,
@@ -165,12 +166,35 @@ impl HummockManager {
             .correct_commit_ssts(sstables, &table_compaction_group_mapping)
             .await?;
 
-        let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect();
+        let modified_compaction_groups = commit_sstables.keys().cloned().collect_vec();
+        // fill compaction_groups
+        let mut group_id_to_config = HashMap::new();
+        if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
+            for cg_id in &modified_compaction_groups {
+                let compaction_group = compaction_group_manager
+                    .get(cg_id)
+                    .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
+                    .compaction_config();
+                group_id_to_config.insert(*cg_id, compaction_group);
+            }
+        } else {
+            let compaction_group_manager = self.compaction_group_manager.read().await;
+            for cg_id in &modified_compaction_groups {
+                let compaction_group = compaction_group_manager
+                    .try_get_compaction_group_config(*cg_id)
+                    .unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
+                    .compaction_config();
+                group_id_to_config.insert(*cg_id, compaction_group);
+            }
+        }
+
+        let group_id_to_sub_levels =
+            rewrite_commit_sstables_to_sub_level(commit_sstables, &group_id_to_config);
 
         let time_travel_delta = version.pre_commit_epoch(
             &tables_to_commit,
             new_compaction_groups,
-            commit_sstables,
+            group_id_to_sub_levels,
             &new_table_ids,
             new_table_watermarks,
             change_log_delta,
@@ -327,6 +351,7 @@ impl HummockManager {
     ) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
         let mut new_sst_id_number = 0;
         let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
+        let commit_object_id_vec = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
         for commit_sst in sstables {
             let mut group_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
             for table_id in &commit_sst.sst_info.table_ids {
@@ -395,6 +420,12 @@ impl HummockManager {
             }
         }
 
+        // order check
+        for ssts in commit_sstables.values() {
+            let object_ids = ssts.iter().map(|s| s.object_id).collect_vec();
+            assert!(is_ordered_subset(&commit_object_id_vec, &object_ids));
+        }
+
         Ok(commit_sstables)
     }
 }
@@ -419,3 +450,57 @@ fn on_handle_add_new_table(
 
     Ok(())
 }
+
+/// Rewrite the commit sstables to sub-levels based on the compaction group config.
+/// The type of `compaction_group_manager_txn` is too complex to be used in the function signature. So we use `HashMap` instead.
+fn rewrite_commit_sstables_to_sub_level(
+    commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
+    group_id_to_config: &HashMap<CompactionGroupId, Arc<CompactionConfig>>,
+) -> BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>> {
+    let mut overlapping_sstables: BTreeMap<u64, Vec<Vec<SstableInfo>>> = BTreeMap::new();
+    for (group_id, inserted_table_infos) in commit_sstables {
+        let config = group_id_to_config
+            .get(&group_id)
+            .expect("compaction group should exist");
+
+        let mut accumulated_size = 0;
+        let mut ssts = vec![];
+        let sub_level_size_limit = config
+            .max_overlapping_level_size
+            .unwrap_or(compaction_config::max_overlapping_level_size());
+
+        let level = overlapping_sstables.entry(group_id).or_default();
+
+        for sst in inserted_table_infos {
+            accumulated_size += sst.sst_size;
+            ssts.push(sst);
+            if accumulated_size > sub_level_size_limit {
+                level.push(ssts);
+
+                // reset the accumulated size and ssts
+                accumulated_size = 0;
+                ssts = vec![];
+            }
+        }
+
+        if !ssts.is_empty() {
+            level.push(ssts);
+        }
+
+        // The uploader organizes the ssts in decreasing epoch order, so the level needs to be reversed to ensure that the latest epoch is at the top.
+        level.reverse();
+    }
+
+    overlapping_sstables
+}
+
+fn is_ordered_subset(vec_1: &Vec<u64>, vec_2: &Vec<u64>) -> bool {
+    let mut vec_2_iter = vec_2.iter().peekable();
+    for item in vec_1 {
+        if vec_2_iter.peek() == Some(&item) {
+            vec_2_iter.next();
+        }
+    }
+
+    vec_2_iter.peek().is_none()
+}
diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs
index 18bb8dfaf86b3..3a6c179c03147 100644
--- a/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs
+++ b/src/meta/src/hummock/manager/compaction/compaction_group_manager.rs
@@ -588,6 +588,9 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi
             MutableConfig::DisableAutoGroupScheduling(c) => {
                 target.disable_auto_group_scheduling = Some(*c);
             }
+            MutableConfig::MaxOverlappingLevelSize(c) => {
+                target.max_overlapping_level_size = Some(*c);
+            }
         }
     }
 }
diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs
index 054ae657d594d..8a4276492365d 100644
--- a/src/meta/src/hummock/manager/transaction.rs
+++ b/src/meta/src/hummock/manager/transaction.rs
@@ -14,7 +14,6 @@
 
 use std::collections::{BTreeMap, HashMap};
 use std::ops::{Deref, DerefMut};
-use std::sync::Arc;
 
 use risingwave_common::catalog::TableId;
 use risingwave_hummock_sdk::change_log::ChangeLogDelta;
@@ -24,11 +23,12 @@ use risingwave_hummock_sdk::table_watermark::TableWatermarks;
 use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
 use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId};
 use risingwave_pb::hummock::{
-    CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas,
-    HummockVersionStats, StateTableInfoDelta,
+    CompatibilityVersion, GroupConstruct, HummockVersionDeltas, HummockVersionStats,
+    StateTableInfoDelta,
 };
 use risingwave_pb::meta::subscribe_response::{Info, Operation};
 
+use crate::hummock::model::CompactionGroup;
 use crate::manager::NotificationManager;
 use crate::model::{
     InMemValTransaction, MetadataModelResult, Transactional, ValTransaction, VarTransaction,
@@ -111,8 +111,8 @@ impl<'a> HummockVersionTransaction<'a> {
     pub(super) fn pre_commit_epoch(
         &mut self,
         tables_to_commit: &HashMap<TableId, u64>,
-        new_compaction_groups: HashMap<CompactionGroupId, Arc<CompactionConfig>>,
-        commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
+        new_compaction_groups: Vec<CompactionGroup>,
+        group_id_to_sub_levels: BTreeMap<CompactionGroupId, Vec<Vec<SstableInfo>>>,
         new_table_ids: &HashMap<TableId, CompactionGroupId>,
         new_table_watermarks: HashMap<TableId, TableWatermarks>,
         change_log_delta: HashMap<TableId, ChangeLogDelta>,
@@ -121,38 +121,36 @@ impl<'a> HummockVersionTransaction<'a> {
         new_version_delta.new_table_watermarks = new_table_watermarks;
         new_version_delta.change_log_delta = change_log_delta;
 
-        for (compaction_group_id, compaction_group_config) in new_compaction_groups {
-            {
-                let group_deltas = &mut new_version_delta
-                    .group_deltas
-                    .entry(compaction_group_id)
-                    .or_default()
-                    .group_deltas;
-
-                #[expect(deprecated)]
-                group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
-                    group_config: Some((*compaction_group_config).clone()),
-                    group_id: compaction_group_id,
-                    parent_group_id: StaticCompactionGroupId::NewCompactionGroup
-                        as CompactionGroupId,
-                    new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
-                    table_ids: vec![],
-                    version: CompatibilityVersion::SplitGroupByTableId as i32,
-                    split_key: None,
-                }));
-            }
+        for compaction_group in &new_compaction_groups {
+            let group_deltas = &mut new_version_delta
+                .group_deltas
+                .entry(compaction_group.group_id())
+                .or_default()
+                .group_deltas;
+
+            #[expect(deprecated)]
+            group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
+                group_config: Some(compaction_group.compaction_config().as_ref().clone()),
+                group_id: compaction_group.group_id(),
+                parent_group_id: StaticCompactionGroupId::NewCompactionGroup as CompactionGroupId,
+                new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
+                table_ids: vec![],
+                version: CompatibilityVersion::SplitGroupByTableId as i32,
+                split_key: None,
+            }));
         }
 
         // Append SSTs to a new version.
-        for (compaction_group_id, inserted_table_infos) in commit_sstables {
+        for (compaction_group_id, sub_levels) in group_id_to_sub_levels {
             let group_deltas = &mut new_version_delta
                 .group_deltas
                 .entry(compaction_group_id)
                 .or_default()
                 .group_deltas;
-            let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos);
 
-            group_deltas.push(group_delta);
+            for sub_level in sub_levels {
+                group_deltas.push(GroupDelta::NewL0SubLevel(sub_level));
+            }
         }
 
         // update state table info
diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs
index 7e847fc089aa2..0b216e84c4960 100644
--- a/src/storage/hummock_test/src/hummock_storage_tests.rs
+++ b/src/storage/hummock_test/src/hummock_storage_tests.rs
@@ -2866,3 +2866,130 @@ async fn test_commit_multi_epoch() {
         assert_eq!(info.committed_epoch, epoch3);
     }
 }
+
+#[tokio::test]
+async fn test_commit_with_large_size() {
+    let test_env = prepare_hummock_test_env().await;
+    let context_id = test_env.meta_client.context_id();
+    let existing_table_id = TableId::new(1);
+    let initial_epoch = INVALID_EPOCH;
+
+    let commit_epoch =
+        |epoch, ssts: Vec<SstableInfo>, new_table_fragment_infos, tables_to_commit: &[TableId]| {
+            let manager = &test_env.manager;
+            let tables_to_commit = tables_to_commit
+                .iter()
+                .map(|table_id| (*table_id, epoch))
+                .collect();
+            let sst_to_context = ssts.iter().map(|sst| (sst.object_id, context_id)).collect();
+
+            let sstables = ssts
+                .into_iter()
+                .map(|sst| LocalSstableInfo {
+                    table_stats: sst
+                        .table_ids
+                        .iter()
+                        .map(|&table_id| {
+                            (
+                                table_id,
+                                TableStats {
+                                    total_compressed_size: 10,
+                                    ..Default::default()
+                                },
+                            )
+                        })
+                        .collect(),
+                    sst_info: sst,
+                    created_at: u64::MAX,
+                })
+                .collect_vec();
+
+            async move {
+                manager
+                    .commit_epoch(CommitEpochInfo {
+                        new_table_watermarks: Default::default(),
+                        sst_to_context,
+                        sstables,
+                        new_table_fragment_infos,
+                        change_log_delta: Default::default(),
+                        tables_to_commit,
+                    })
+                    .await
+                    .unwrap();
+            }
+        };
+
+    let epoch1 = initial_epoch.next_epoch();
+    let sst1_epoch1 = SstableInfo {
+        sst_id: 11,
+        object_id: 1,
+        table_ids: vec![existing_table_id.table_id],
+        file_size: 512 << 20,
+        sst_size: 512 << 20,
+        ..Default::default()
+    };
+
+    let sst1_epoch2 = SstableInfo {
+        sst_id: 12,
+        object_id: 2,
+        table_ids: vec![existing_table_id.table_id],
+        file_size: 512 << 20,
+        sst_size: 512 << 20,
+        ..Default::default()
+    };
+
+    let sst1_epoch3 = SstableInfo {
+        sst_id: 13,
+        object_id: 3,
+        table_ids: vec![existing_table_id.table_id],
+        file_size: 512 << 20,
+        sst_size: 512 << 20,
+        ..Default::default()
+    };
+
+    commit_epoch(
+        epoch1,
+        vec![
+            sst1_epoch3.clone(),
+            sst1_epoch2.clone(),
+            sst1_epoch1.clone(),
+        ],
+        vec![NewTableFragmentInfo {
+            table_ids: HashSet::from_iter([existing_table_id]),
+        }],
+        &[existing_table_id],
+    )
+    .await;
+
+    let cg_id =
+        get_compaction_group_id_by_table_id(test_env.manager.clone(), existing_table_id.table_id())
+            .await;
+
+    let l0_sub_levels = test_env
+        .manager
+        .get_current_version()
+        .await
+        .levels
+        .get(&cg_id)
+        .unwrap()
+        .l0
+        .clone();
+
+    println!("l0_sub_levels {:?}", l0_sub_levels.sub_levels);
+    assert_eq!(3, l0_sub_levels.sub_levels.len());
+    assert_eq!(1, l0_sub_levels.sub_levels[0].table_infos.len());
+    assert_eq!(
+        sst1_epoch1.object_id,
+        l0_sub_levels.sub_levels[0].table_infos[0].object_id
+    );
+    assert_eq!(1, l0_sub_levels.sub_levels[1].table_infos.len());
+    assert_eq!(
+        sst1_epoch2.object_id,
+        l0_sub_levels.sub_levels[1].table_infos[0].object_id
+    );
+    assert_eq!(1, l0_sub_levels.sub_levels[2].table_infos.len());
+    assert_eq!(
+        sst1_epoch3.object_id,
+        l0_sub_levels.sub_levels[2].table_infos[0].object_id
+    );
+}
diff --git a/src/storage/src/monitor/monitored_storage_metrics.rs b/src/storage/src/monitor/monitored_storage_metrics.rs
index 8bd7ef64b6b83..f8e6ee1e24418 100644
--- a/src/storage/src/monitor/monitored_storage_metrics.rs
+++ b/src/storage/src/monitor/monitored_storage_metrics.rs
@@ -70,8 +70,8 @@ pub fn global_storage_metrics(metric_level: MetricLevel) -> MonitoredStorageMetr
 
 impl MonitoredStorageMetrics {
     pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
-        // 256B ~ max 4GB
-        let size_buckets = exponential_buckets(256.0, 16.0, 7).unwrap();
+        // 256B ~ max 64GB
+        let size_buckets = exponential_buckets(256.0, 16.0, 8).unwrap();
         // 10ms ~ max 2.7h
         let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
         // ----- get -----