Skip to content

Commit

Permalink
feat(storage): reduce group deltas for l0 task (#12085)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Sep 7, 2023
1 parent b2dc4eb commit 4bb7c66
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 26 deletions.
53 changes: 33 additions & 20 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2772,34 +2772,47 @@ fn gen_version_delta<'a>(
.or_default()
.group_deltas;
let mut gc_object_ids = vec![];
let mut removed_table_ids_map: BTreeMap<u32, Vec<u64>> = BTreeMap::default();

for level in &compact_task.input_ssts {
let level_idx = level.level_idx;
let mut removed_table_ids = level
.table_infos
.iter()
.map(|sst| {
let object_id = sst.get_object_id();
let sst_id = sst.get_sst_id();
if !trivial_move
&& drop_sst(
branched_ssts,
compact_task.compaction_group_id,
object_id,
sst_id,
)
{
gc_object_ids.push(object_id);
}
sst_id
})
.collect_vec();

removed_table_ids_map
.entry(level_idx)
.or_default()
.append(&mut removed_table_ids);
}

for (level_idx, removed_table_ids) in removed_table_ids_map {
let group_delta = GroupDelta {
delta_type: Some(DeltaType::IntraLevel(IntraLevelDelta {
level_idx: level.level_idx,
removed_table_ids: level
.table_infos
.iter()
.map(|sst| {
let object_id = sst.get_object_id();
let sst_id = sst.get_sst_id();
if !trivial_move
&& drop_sst(
branched_ssts,
compact_task.compaction_group_id,
object_id,
sst_id,
)
{
gc_object_ids.push(object_id);
}
sst_id
})
.collect_vec(),
level_idx,
removed_table_ids,
..Default::default()
})),
};
group_deltas.push(group_delta);
}

let group_delta = GroupDelta {
delta_type: Some(DeltaType::IntraLevel(IntraLevelDelta {
level_idx: compact_task.target_level,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,29 +348,31 @@ impl HummockVersionUpdateExt for HummockVersion {
}
}
}
for (z, level) in parent_levels.levels.iter_mut().enumerate() {
for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
let insert_table_infos = split_sst_info_for_level(
&member_table_ids,
allow_trivial_split,
level,
&mut split_id_vers,
&mut new_sst_id,
);
cur_levels.levels[z].total_file_size += insert_table_infos
cur_levels.levels[idx].total_file_size += insert_table_infos
.iter()
.map(|sst| sst.file_size)
.sum::<u64>();
cur_levels.levels[z].uncompressed_file_size += insert_table_infos
cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
.iter()
.map(|sst| sst.uncompressed_file_size)
.sum::<u64>();
cur_levels.levels[z].table_infos.extend(insert_table_infos);
cur_levels.levels[z].table_infos.sort_by(|sst1, sst2| {
cur_levels.levels[idx]
.table_infos
.extend(insert_table_infos);
cur_levels.levels[idx].table_infos.sort_by(|sst1, sst2| {
let a = sst1.key_range.as_ref().unwrap();
let b = sst2.key_range.as_ref().unwrap();
a.compare(b)
});
assert!(can_concat(&cur_levels.levels[z].table_infos));
assert!(can_concat(&cur_levels.levels[idx].table_infos));
level
.table_infos
.drain_filter(|sst_info| sst_info.table_ids.is_empty())
Expand Down

0 comments on commit 4bb7c66

Please sign in to comment.