Skip to content

Commit

Permalink
feat(meta): commit epoch in separate group delta
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Oct 14, 2024
1 parent a348b8a commit f0467ca
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 240 deletions.
5 changes: 5 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ message InputLevel {
repeated SstableInfo table_infos = 3;
}

message NewL0SubLevel {
repeated SstableInfo inserted_table_infos = 1;
}

message IntraLevelDelta {
uint32 level_idx = 1;
uint64 l0_sub_level_id = 2;
Expand Down Expand Up @@ -112,6 +116,7 @@ message GroupDelta {
GroupConstruct group_construct = 2;
GroupDestroy group_destroy = 3;
GroupMerge group_merge = 6;
NewL0SubLevel new_l0_sub_level = 7;
}
}

Expand Down
27 changes: 18 additions & 9 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ use std::ops::Bound::{Excluded, Included};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::Ordering;

use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
object_size_map, summarize_group_deltas,
};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion};
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
use risingwave_pb::hummock::{
Expand Down Expand Up @@ -156,13 +154,24 @@ impl HummockManager {
.hummock_version_deltas
.range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
{
for (group_id, group_deltas) in &version_delta.group_deltas {
let summary = summarize_group_deltas(group_deltas, *group_id);
for group_deltas in version_delta.group_deltas.values() {
object_sizes.extend(
summary
.insert_table_infos
group_deltas
.group_deltas
.iter()
.map(|t| (t.object_id, t.file_size))
.flat_map(|delta| {
match delta {
GroupDeltaCommon::IntraLevel(level_delta) => Some(level_delta),
_ => None,
}
.into_iter()
.flat_map(|level_delta| {
level_delta
.inserted_table_infos
.iter()
.map(|t| (t.object_id, t.file_size))
})
})
.chain(
version_delta
.change_log_delta
Expand Down
8 changes: 3 additions & 5 deletions src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,17 +153,15 @@ impl<'a> HummockVersionTransaction<'a> {
.entry(compact_task.compaction_group_id)
.or_default()
.group_deltas;
let mut removed_table_ids_map: BTreeMap<u32, Vec<u64>> = BTreeMap::default();
let mut removed_table_ids_map: BTreeMap<u32, HashSet<u64>> = BTreeMap::default();

for level in &compact_task.input_ssts {
let level_idx = level.level_idx;
let mut removed_table_ids =
level.table_infos.iter().map(|sst| sst.sst_id).collect_vec();

removed_table_ids_map
.entry(level_idx)
.or_default()
.append(&mut removed_table_ids);
.extend(level.table_infos.iter().map(|sst| sst.sst_id));
}

for (level_idx, removed_table_ids) in removed_table_ids_map {
Expand All @@ -181,7 +179,7 @@ impl<'a> HummockVersionTransaction<'a> {
let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
compact_task.target_level,
compact_task.target_sub_level_id,
vec![], // default
HashSet::new(), // default
compact_task.sorted_output_ssts.clone(),
compact_task.split_weight_by_vnode,
));
Expand Down
12 changes: 2 additions & 10 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ use risingwave_hummock_sdk::change_log::ChangeLogDelta;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::{
GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta,
};
use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{
CompactionGroupId, FrontendHummockVersionDelta, HummockEpoch, HummockVersionId,
};
Expand Down Expand Up @@ -154,13 +152,7 @@ impl<'a> HummockVersionTransaction<'a> {
.entry(compaction_group_id)
.or_default()
.group_deltas;
let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
0,
0, // l0_sub_level_id will be generated during apply_version_delta
vec![], // default
inserted_table_infos,
0, // default
));
let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos);

group_deltas.push(group_delta);
}
Expand Down
1 change: 1 addition & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute("hummock.GroupTableChange", "#[derive(Eq)]")
.type_attribute("hummock.GroupMerge", "#[derive(Eq)]")
.type_attribute("hummock.GroupDelta", "#[derive(Eq)]")
.type_attribute("hummock.NewL0SubLevel", "#[derive(Eq)]")
.type_attribute("hummock.LevelHandler.RunningCompactTask", "#[derive(Eq)]")
.type_attribute("hummock.LevelHandler", "#[derive(Eq)]")
.type_attribute("hummock.TableOption", "#[derive(Eq)]")
Expand Down
Loading

0 comments on commit f0467ca

Please sign in to comment.