From 3dcff6b18b8a5b1eacc6e5420a919febe98fe281 Mon Sep 17 00:00:00 2001
From: Little-Wallace <bupt2013211450@gmail.com>
Date: Tue, 26 Sep 2023 15:39:54 +0800
Subject: [PATCH] do not split too many files

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
---
 proto/hummock.proto                           |  3 +
 src/meta/src/hummock/compaction/mod.rs        |  2 +-
 .../picker/compaction_task_validator.rs       |  7 ++
 .../picker/intra_compaction_picker.rs         | 82 ++++++++++++++++++-
 src/meta/src/hummock/compaction/picker/mod.rs | 16 ++++
 .../picker/tier_compaction_picker.rs          |  9 +-
 src/meta/src/hummock/manager/mod.rs           |  1 +
 .../compaction_group/hummock_version_ext.rs   | 34 ++++++++
 8 files changed, 150 insertions(+), 4 deletions(-)

diff --git a/proto/hummock.proto b/proto/hummock.proto
index c73d79e07e2b7..dca9e13fd11ed 100644
--- a/proto/hummock.proto
+++ b/proto/hummock.proto
@@ -49,6 +49,7 @@ message Level {
   uint64 total_file_size = 4;
   uint64 sub_level_id = 5;
   uint64 uncompressed_file_size = 6;
+  uint32 vnode_partition_count = 7;
 }
 
 message InputLevel {
@@ -62,6 +63,7 @@ message IntraLevelDelta {
   uint64 l0_sub_level_id = 2;
   repeated uint64 removed_table_ids = 3;
   repeated SstableInfo inserted_table_infos = 4;
+  uint32 vnode_partition_count = 5;
 }
 
 enum CompatibilityVersion {
@@ -116,6 +118,7 @@ message HummockVersion {
     uint64 group_id = 3;
     uint64 parent_group_id = 4;
     repeated uint32 member_table_ids = 5;
+    uint32 vnode_partition_count = 6;
   }
   uint64 id = 1;
   // Levels of each compaction group
diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs
index 23585da8999a9..4552298dccabb 100644
--- a/src/meta/src/hummock/compaction/mod.rs
+++ b/src/meta/src/hummock/compaction/mod.rs
@@ -155,7 +155,7 @@ impl CompactStatus {
             target_sub_level_id: ret.input.target_sub_level_id,
             task_type: ret.compaction_task_type as i32,
             split_by_state_table: group.compaction_config.split_by_state_table,
-            split_weight_by_vnode: group.compaction_config.split_weight_by_vnode,
+            split_weight_by_vnode: ret.input.vnode_partition_count,
         };
         Some(compact_task)
     }
diff --git a/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs b/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs
index 7452f65d6503a..34c71c3cd4b75 100644
--- a/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs
+++ b/src/meta/src/hummock/compaction/picker/compaction_task_validator.rs
@@ -135,6 +135,13 @@ impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule {
             return true;
         }
 
+        if input.vnode_partition_count > 0
+            && input.select_input_size > self.config.sub_level_max_compaction_bytes
+            && input.input_levels.len() > 1
+        {
+            return true;
+        }
+
         let intra_sub_level_compact_level_count =
             self.config.level0_sub_level_compact_level_count as usize;
 
diff --git a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs
index 541b93254172b..d43be005a30ea 100644
--- a/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs
+++ b/src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs
@@ -57,7 +57,16 @@ impl CompactionPicker for IntraCompactionPicker {
             return None;
         }
 
-        if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], stats) {
+        let vnode_partition_count = levels.vnode_partition_count;
+
+        if let Some(ret) =
+            self.pick_whole_level(l0, &level_handlers[0], vnode_partition_count, stats)
+        {
+            return Some(ret);
+        }
+
+        if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], vnode_partition_count, stats)
+        {
             return Some(ret);
         }
 
@@ -84,10 +93,80 @@ impl IntraCompactionPicker {
         }
     }
 
+    fn pick_whole_level(
+        &self,
+        l0: &OverlappingLevel,
+        level_handler: &LevelHandler,
+        partition_count: u32,
+        stats: &mut LocalPickerStatistic,
+    ) -> Option<CompactionInput> {
+        for (idx, level) in l0.sub_levels.iter().enumerate() {
+            if level.level_type() != LevelType::Nonoverlapping
+                || level.total_file_size > self.config.sub_level_max_compaction_bytes
+                || level.vnode_partition_count == partition_count
+            {
+                continue;
+            }
+
+            let max_compaction_bytes = std::cmp::min(
+                self.config.max_compaction_bytes,
+                self.config.sub_level_max_compaction_bytes
+                    * (self.config.level0_sub_level_compact_level_count as u64),
+            );
+
+            let mut select_input_size = 0;
+
+            let mut select_level_inputs = vec![];
+            let mut total_file_count = 0;
+            for next_level in l0.sub_levels.iter().skip(idx) {
+                if next_level.level_type() != LevelType::Nonoverlapping
+                    || select_input_size > max_compaction_bytes
+                    || level_handler.is_level_pending_compact(next_level)
+                {
+                    break;
+                }
+                select_input_size += next_level.total_file_size;
+                total_file_count += next_level.table_infos.len();
+
+                select_level_inputs.push(InputLevel {
+                    level_idx: 0,
+                    level_type: next_level.level_type,
+                    table_infos: next_level.table_infos.clone(),
+                });
+            }
+            if !select_level_inputs.is_empty() {
+                let vnode_partition_count =
+                    if select_input_size > self.config.sub_level_max_compaction_bytes {
+                        partition_count
+                    } else {
+                        0
+                    };
+                let result = CompactionInput {
+                    input_levels: select_level_inputs,
+                    target_sub_level_id: level.sub_level_id,
+                    select_input_size,
+                    total_file_count: total_file_count as u64,
+                    vnode_partition_count,
+                    ..Default::default()
+                };
+                if self.compaction_task_validator.valid_compact_task(
+                    &result,
+                    ValidationRuleType::Intra,
+                    stats,
+                ) {
+                    return Some(result);
+                }
+            }
+        }
+
+        None
+    }
+
     fn pick_l0_intra(
         &self,
         l0: &OverlappingLevel,
         level_handler: &LevelHandler,
+        vnode_partition_count: u32,
         stats: &mut LocalPickerStatistic,
     ) -> Option<CompactionInput> {
         let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
@@ -95,6 +174,7 @@ impl IntraCompactionPicker {
         for (idx, level) in l0.sub_levels.iter().enumerate() {
             if level.level_type() != LevelType::Nonoverlapping
                 || level.total_file_size > self.config.sub_level_max_compaction_bytes
+                || level.vnode_partition_count < vnode_partition_count
             {
                 continue;
             }
diff --git a/src/meta/src/hummock/compaction/picker/mod.rs b/src/meta/src/hummock/compaction/picker/mod.rs
index ac1a8f825aa33..ae588704e436e 100644
--- a/src/meta/src/hummock/compaction/picker/mod.rs
+++ b/src/meta/src/hummock/compaction/picker/mod.rs
@@ -61,6 +61,7 @@ pub struct CompactionInput {
     pub select_input_size: u64,
     pub target_input_size: u64,
     pub total_file_count: u64,
+    pub vnode_partition_count: u32,
 }
 
 impl CompactionInput {
@@ -96,3 +97,18 @@ pub trait CompactionPicker {
         stats: &mut LocalPickerStatistic,
     ) -> Option<CompactionInput>;
 }
+
+#[derive(Default, Clone, Debug)]
+pub struct PartitionLevelInfo {
+    pub level_id: u32,
+    pub sub_level_id: u64,
+    pub left_idx: usize,
+    pub right_idx: usize,
+    pub total_file_size: u64,
+}
+
+#[derive(Default, Clone, Debug)]
+pub struct LevelPartition {
+    pub sub_levels: Vec<PartitionLevelInfo>,
+    pub total_file_size: u64,
+}
diff --git a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs
index 5b3058317a4b0..3f68c4459bbd4 100644
--- a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs
+++ b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs
@@ -52,6 +52,7 @@ impl TierCompactionPicker {
         &self,
         l0: &OverlappingLevel,
         level_handler: &LevelHandler,
+        mut vnode_partition_count: u32,
         stats: &mut LocalPickerStatistic,
     ) -> Option<CompactionInput> {
         for (idx, level) in l0.sub_levels.iter().enumerate() {
@@ -114,6 +115,9 @@ impl TierCompactionPicker {
             }
 
             select_level_inputs.reverse();
+            if compaction_bytes < self.config.sub_level_max_compaction_bytes {
+                vnode_partition_count = 0;
+            }
 
             let result = CompactionInput {
                 input_levels: select_level_inputs,
@@ -122,6 +126,7 @@ impl TierCompactionPicker {
                 select_input_size: compaction_bytes,
                 target_input_size: 0,
                 total_file_count: compact_file_count,
+                vnode_partition_count,
             };
 
             if !self.compaction_task_validator.valid_compact_task(
@@ -150,7 +155,7 @@ impl CompactionPicker for TierCompactionPicker {
             return None;
         }
 
-        self.pick_overlapping_level(l0, &level_handlers[0], stats)
+        self.pick_overlapping_level(l0, &level_handlers[0], levels.vnode_partition_count, stats)
     }
 }
 
@@ -257,7 +262,7 @@ pub mod tests {
         // sub_level_max_compaction_bytes.
         let mut picker = TierCompactionPicker::new(config);
         let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
-        assert!(ret.is_none())
+        assert!(ret.is_none());
     }
 
     #[test]
diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs
index 5effaee29077b..a6ef5034f5646 100644
--- a/src/meta/src/hummock/manager/mod.rs
+++ b/src/meta/src/hummock/manager/mod.rs
@@ -2900,6 +2900,7 @@ fn gen_version_delta<'a>(
             level_idx: compact_task.target_level,
             inserted_table_infos: compact_task.sorted_output_ssts.clone(),
             l0_sub_level_id: compact_task.target_sub_level_id,
+            vnode_partition_count: compact_task.split_weight_by_vnode,
             ..Default::default()
         })),
     };
diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
index 1193877a14c9b..b5c9889d31cb4 100644
--- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
+++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
@@ -15,6 +15,7 @@
 use std::cmp::Ordering;
 use std::collections::hash_map::Entry;
 use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
+use std::ops::Bound;
 
 use itertools::Itertools;
 use risingwave_common::catalog::TableId;
@@ -44,6 +45,7 @@ pub struct GroupDeltasSummary {
     pub group_destroy: Option<GroupDestroy>,
     pub group_meta_changes: Vec<GroupMetaChange>,
     pub group_table_change: Option<GroupTableChange>,
+    pub new_vnode_partition_count: u32,
 }
 
 pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary {
@@ -56,6 +58,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary
     let mut group_destroy = None;
     let mut group_meta_changes = vec![];
     let mut group_table_change = None;
+    let mut new_vnode_partition_count = 0;
 
     for group_delta in &group_deltas.group_deltas {
         match group_delta.get_delta_type().unwrap() {
@@ -69,6 +72,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary
                     insert_sub_level_id = intra_level.l0_sub_level_id;
                     insert_table_infos.extend(intra_level.inserted_table_infos.iter().cloned());
                 }
+                new_vnode_partition_count = intra_level.vnode_partition_count;
             }
             DeltaType::GroupConstruct(construct_delta) => {
                 assert!(group_construct.is_none());
@@ -100,6 +104,7 @@ pub fn summarize_group_deltas(group_deltas: &GroupDeltas) -> GroupDeltasSummary
         group_destroy,
         group_meta_changes,
         group_table_change,
+        new_vnode_partition_count,
     }
 }
 
@@ -620,6 +625,7 @@ pub trait HummockLevelsExt {
         delete_sst_levels: &[u32],
         delete_sst_ids_set: HashSet<u64>,
     ) -> bool;
+    fn can_partition_by_vnode(&self) -> bool;
 }
 
 impl HummockLevelsExt for Levels {
@@ -651,6 +657,7 @@ impl HummockLevelsExt for Levels {
             insert_sst_level_id,
             insert_sub_level_id,
             insert_table_infos,
+            new_vnode_partition_count,
             ..
         } = summary;
 
@@ -695,9 +702,29 @@ impl HummockLevelsExt for Levels {
                     "should find the level to insert into when applying compaction generated delta. sub level idx: {},  removed sst ids: {:?}, sub levels: {:?},",
                     insert_sub_level_id, delete_sst_ids_set, l0.sub_levels.iter().map(|level| level.sub_level_id).collect_vec()
                 );
+                if l0.sub_levels[index].table_infos.is_empty()
+                    && self.member_table_ids.len() == 1
+                    && insert_table_infos.iter().all(|sst| {
+                        sst.table_ids.len() == 1 && sst.table_ids[0] == self.member_table_ids[0]
+                    })
+                {
+                    l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
+                }
                 level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
             } else {
                 let idx = insert_sst_level_id as usize - 1;
+                if self.levels[idx].table_infos.is_empty()
+                    && insert_table_infos
+                        .iter()
+                        .all(|sst| sst.table_ids.len() == 1)
+                {
+                    self.levels[idx].vnode_partition_count = new_vnode_partition_count;
+                } else if self.levels[idx].vnode_partition_count != 0
+                    && new_vnode_partition_count == 0
+                    && self.member_table_ids.len() > 1
+                {
+                    self.levels[idx].vnode_partition_count = 0;
+                }
                 level_insert_ssts(&mut self.levels[idx], insert_table_infos);
             }
         }
@@ -747,6 +774,10 @@ impl HummockLevelsExt for Levels {
         }
         delete_sst_ids_set.is_empty()
     }
+
+    fn can_partition_by_vnode(&self) -> bool {
+        self.vnode_partition_count > 0 && self.member_table_ids.len() == 1
+    }
 }
 
 pub fn build_initial_compaction_group_levels(
@@ -762,6 +793,7 @@ pub fn build_initial_compaction_group_levels(
             total_file_size: 0,
             sub_level_id: 0,
             uncompressed_file_size: 0,
+            vnode_partition_count: 0,
         });
     }
     Levels {
@@ -774,6 +806,7 @@ pub fn build_initial_compaction_group_levels(
         group_id,
         parent_group_id: StaticCompactionGroupId::NewCompactionGroup as _,
         member_table_ids: vec![],
+        vnode_partition_count: compaction_config.split_weight_by_vnode,
     }
 }
 
@@ -912,6 +945,7 @@ pub fn new_sub_level(
         total_file_size,
         sub_level_id,
         uncompressed_file_size,
+        vnode_partition_count: 0,
     }
 }