Skip to content

Commit

Permalink
feat(compaction): Limit the size of the new overlapping level
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Nov 6, 2024
1 parent 9a32e75 commit 0f7d928
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 10 deletions.
32 changes: 29 additions & 3 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl HummockManager {
let state_table_info = &version.latest_version().state_table_info;
let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();
let mut new_table_ids = HashMap::new();
let mut new_compaction_groups = HashMap::new();
let mut compaction_groups = HashMap::new();
let mut compaction_group_manager_txn = None;
let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;

Expand Down Expand Up @@ -143,7 +143,10 @@ impl HummockManager {
)
};
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
new_compaction_groups.insert(new_compaction_group_id, compaction_group_config.clone());
compaction_groups.insert(
new_compaction_group_id,
(true, compaction_group_config.clone()),
);
compaction_group_manager.insert(
new_compaction_group_id,
CompactionGroup {
Expand All @@ -166,10 +169,33 @@ impl HummockManager {
.await?;

let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect();
// fill compaction_groups
if let Some(compaction_group_manager) = compaction_group_manager_txn.as_ref() {
for cg_id in &modified_compaction_groups {
if !compaction_groups.contains_key(cg_id) {
let compaction_group = compaction_group_manager
.get(cg_id)
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
.compaction_config();
compaction_groups.insert(*cg_id, (false, compaction_group));
}
}
} else {
let compaction_group_manager = self.compaction_group_manager.read().await;
for cg_id in &modified_compaction_groups {
if !compaction_groups.contains_key(cg_id) {
let compaction_group = compaction_group_manager
.try_get_compaction_group_config(*cg_id)
.unwrap_or_else(|| panic!("compaction group {} should be created", cg_id))
.compaction_config();
compaction_groups.insert(*cg_id, (false, compaction_group));
}
}
}

let time_travel_delta = version.pre_commit_epoch(
&tables_to_commit,
new_compaction_groups,
compaction_groups,
commit_sstables,
&new_table_ids,
new_table_watermarks,
Expand Down
45 changes: 38 additions & 7 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl<'a> HummockVersionTransaction<'a> {
pub(super) fn pre_commit_epoch(
&mut self,
tables_to_commit: &HashMap<TableId, u64>,
new_compaction_groups: HashMap<CompactionGroupId, Arc<CompactionConfig>>,
compaction_groups: HashMap<CompactionGroupId, (bool, Arc<CompactionConfig>)>,
commit_sstables: BTreeMap<CompactionGroupId, Vec<SstableInfo>>,
new_table_ids: &HashMap<TableId, CompactionGroupId>,
new_table_watermarks: HashMap<TableId, TableWatermarks>,
Expand All @@ -121,18 +121,22 @@ impl<'a> HummockVersionTransaction<'a> {
new_version_delta.new_table_watermarks = new_table_watermarks;
new_version_delta.change_log_delta = change_log_delta;

for (compaction_group_id, compaction_group_config) in new_compaction_groups {
for (compaction_group_id, (is_new, compaction_group_config)) in &compaction_groups {
{
if !is_new {
continue;
}

let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.entry(*compaction_group_id)
.or_default()
.group_deltas;

#[expect(deprecated)]
group_deltas.push(GroupDelta::GroupConstruct(GroupConstruct {
group_config: Some((*compaction_group_config).clone()),
group_id: compaction_group_id,
group_config: Some((**compaction_group_config).clone()),
group_id: *compaction_group_id,
parent_group_id: StaticCompactionGroupId::NewCompactionGroup
as CompactionGroupId,
new_sst_start_id: 0, // No need to set it when `NewCompactionGroup`
Expand All @@ -145,14 +149,41 @@ impl<'a> HummockVersionTransaction<'a> {

// Append SSTs to a new version.
for (compaction_group_id, inserted_table_infos) in commit_sstables {
let mut accumulated_size = 0;
let mut ssts = vec![];
let (_, config) = compaction_groups
.get(&compaction_group_id)
.unwrap_or_else(|| {
panic!(
"compaction group {:?} not found in compaction_groups",
compaction_group_id
)
});
let sub_level_size_limit = config.sub_level_max_compaction_bytes * 2;

let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;
let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos);

group_deltas.push(group_delta);
for sst in inserted_table_infos {
accumulated_size += sst.sst_size;
ssts.push(sst);
if accumulated_size > sub_level_size_limit {
let group_delta = GroupDelta::NewL0SubLevel(ssts);
group_deltas.push(group_delta);

// reset the accumulated size and ssts
accumulated_size = 0;
ssts = vec![];
}
}

if accumulated_size != 0 {
let group_delta = GroupDelta::NewL0SubLevel(ssts);
group_deltas.push(group_delta);
}
}

// update state table info
Expand Down
126 changes: 126 additions & 0 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2826,3 +2826,129 @@ async fn test_commit_multi_epoch() {
assert_eq!(info.committed_epoch, epoch3);
}
}

#[tokio::test]
async fn test_commit_with_large_size() {
let test_env = prepare_hummock_test_env().await;
let context_id = test_env.meta_client.context_id();
let existing_table_id = TableId::new(1);
let initial_epoch = INVALID_EPOCH;

let commit_epoch =
|epoch, ssts: Vec<SstableInfo>, new_table_fragment_infos, tables_to_commit: &[TableId]| {
let manager = &test_env.manager;
let tables_to_commit = tables_to_commit
.iter()
.map(|table_id| (*table_id, epoch))
.collect();
let sst_to_context = ssts.iter().map(|sst| (sst.object_id, context_id)).collect();

let sstables = ssts
.into_iter()
.map(|sst| LocalSstableInfo {
table_stats: sst
.table_ids
.iter()
.map(|&table_id| {
(
table_id,
TableStats {
total_compressed_size: 10,
..Default::default()
},
)
})
.collect(),
sst_info: sst,
created_at: u64::MAX,
})
.collect_vec();

async move {
manager
.commit_epoch(CommitEpochInfo {
new_table_watermarks: Default::default(),
sst_to_context,
sstables,
new_table_fragment_infos,
change_log_delta: Default::default(),
tables_to_commit,
})
.await
.unwrap();
}
};

let epoch1 = initial_epoch.next_epoch();
let sst1_epoch1 = SstableInfo {
sst_id: 11,
object_id: 1,
table_ids: vec![existing_table_id.table_id],
file_size: 512 << 20,
sst_size: 512 << 20,
..Default::default()
};

let sst1_epoch2 = SstableInfo {
sst_id: 12,
object_id: 2,
table_ids: vec![existing_table_id.table_id],
file_size: 512 << 20,
sst_size: 512 << 20,
..Default::default()
};

let sst1_epoch3 = SstableInfo {
sst_id: 13,
object_id: 3,
table_ids: vec![existing_table_id.table_id],
file_size: 512 << 20,
sst_size: 512 << 20,
..Default::default()
};

commit_epoch(
epoch1,
vec![
sst1_epoch1.clone(),
sst1_epoch2.clone(),
sst1_epoch3.clone(),
],
vec![NewTableFragmentInfo {
table_ids: HashSet::from_iter([existing_table_id]),
}],
&[existing_table_id],
)
.await;

let cg_id =
get_compaction_group_id_by_table_id(test_env.manager.clone(), existing_table_id.table_id())
.await;

let l0_sub_levels = test_env
.manager
.get_current_version()
.await
.levels
.get(&cg_id)
.unwrap()
.l0
.clone();

assert_eq!(3, l0_sub_levels.sub_levels.len());
assert_eq!(1, l0_sub_levels.sub_levels[0].table_infos.len());
assert_eq!(
sst1_epoch1.object_id,
l0_sub_levels.sub_levels[0].table_infos[0].object_id
);
assert_eq!(1, l0_sub_levels.sub_levels[1].table_infos.len());
assert_eq!(
sst1_epoch2.object_id,
l0_sub_levels.sub_levels[1].table_infos[0].object_id
);
assert_eq!(1, l0_sub_levels.sub_levels[2].table_infos.len());
assert_eq!(
sst1_epoch3.object_id,
l0_sub_levels.sub_levels[2].table_infos[0].object_id
);
}

0 comments on commit 0f7d928

Please sign in to comment.