Skip to content

Commit

Permalink
feat(meta): commit epoch in separate group delta (#18893)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Oct 18, 2024
1 parent d6277fb commit c0d6af1
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 317 deletions.
5 changes: 5 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ message InputLevel {
repeated SstableInfo table_infos = 3;
}

message NewL0SubLevel {
repeated SstableInfo inserted_table_infos = 1;
}

message IntraLevelDelta {
uint32 level_idx = 1;
uint64 l0_sub_level_id = 2;
Expand Down Expand Up @@ -112,6 +116,7 @@ message GroupDelta {
GroupConstruct group_construct = 2;
GroupDestroy group_destroy = 3;
GroupMerge group_merge = 6;
NewL0SubLevel new_l0_sub_level = 7;
}
}

Expand Down
24 changes: 16 additions & 8 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,22 @@ pub async fn print_version_delta_in_archive(
}

fn match_delta(delta: &DeltaType, sst_id: HummockSstableObjectId) -> bool {
let DeltaType::IntraLevel(delta) = delta else {
return false;
};
delta
.inserted_table_infos
.iter()
.any(|sst| sst.sst_id == sst_id)
|| delta.removed_table_ids.iter().any(|sst| *sst == sst_id)
match delta {
DeltaType::GroupConstruct(_) | DeltaType::GroupDestroy(_) | DeltaType::GroupMerge(_) => {
false
}
DeltaType::IntraLevel(delta) => {
delta
.inserted_table_infos
.iter()
.any(|sst| sst.sst_id == sst_id)
|| delta.removed_table_ids.iter().any(|sst| *sst == sst_id)
}
DeltaType::NewL0SubLevel(delta) => delta
.inserted_table_infos
.iter()
.any(|sst| sst.sst_id == sst_id),
}
}

fn print_delta(delta: &DeltaType) {
Expand Down
30 changes: 21 additions & 9 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ use std::ops::Bound::{Excluded, Included};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::Ordering;

use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
object_size_map, summarize_group_deltas,
};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion};
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
use risingwave_pb::hummock::{
Expand Down Expand Up @@ -156,13 +154,27 @@ impl HummockManager {
.hummock_version_deltas
.range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
{
for (group_id, group_deltas) in &version_delta.group_deltas {
let summary = summarize_group_deltas(group_deltas, *group_id);
for group_deltas in version_delta.group_deltas.values() {
object_sizes.extend(
summary
.insert_table_infos
group_deltas
.group_deltas
.iter()
.map(|t| (t.object_id, t.file_size))
.flat_map(|delta| {
match delta {
GroupDeltaCommon::IntraLevel(level_delta) => {
Some(level_delta.inserted_table_infos.iter())
}
GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
Some(inserted_table_infos.iter())
}
GroupDeltaCommon::GroupConstruct(_)
| GroupDeltaCommon::GroupDestroy(_)
| GroupDeltaCommon::GroupMerge(_) => None,
}
.into_iter()
.flatten()
.map(|t| (t.object_id, t.file_size))
})
.chain(
version_delta
.change_log_delta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::compact_task::ReportTask;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
HummockLevelsExt, TableGroupInfo,
};
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::TableGroupInfo;
use risingwave_hummock_sdk::compaction_group::{
group_split, StateTableId, StaticCompactionGroupId,
};
Expand Down
9 changes: 3 additions & 6 deletions src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use rand::seq::SliceRandom;
use rand::thread_rng;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt;
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::level::Levels;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
Expand Down Expand Up @@ -153,17 +152,15 @@ impl<'a> HummockVersionTransaction<'a> {
.entry(compact_task.compaction_group_id)
.or_default()
.group_deltas;
let mut removed_table_ids_map: BTreeMap<u32, Vec<u64>> = BTreeMap::default();
let mut removed_table_ids_map: BTreeMap<u32, HashSet<u64>> = BTreeMap::default();

for level in &compact_task.input_ssts {
let level_idx = level.level_idx;
let mut removed_table_ids =
level.table_infos.iter().map(|sst| sst.sst_id).collect_vec();

removed_table_ids_map
.entry(level_idx)
.or_default()
.append(&mut removed_table_ids);
.extend(level.table_infos.iter().map(|sst| sst.sst_id));
}

for (level_idx, removed_table_ids) in removed_table_ids_map {
Expand All @@ -181,7 +178,7 @@ impl<'a> HummockVersionTransaction<'a> {
let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
compact_task.target_level,
compact_task.target_sub_level_id,
vec![], // default
HashSet::new(), // default
compact_task.sorted_output_ssts.clone(),
compact_task.split_weight_by_vnode,
));
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ impl HummockManager {
}
let written = write_sstable_infos(
delta
.newly_added_sst_infos(&select_groups)
.newly_added_sst_infos(Some(&select_groups))
.filter(|s| !skip_sst_ids.contains(&s.sst_id)),
txn,
)
Expand Down
12 changes: 2 additions & 10 deletions src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ use risingwave_hummock_sdk::change_log::ChangeLogDelta;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::{
GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta,
};
use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId};
use risingwave_pb::hummock::{
CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas,
Expand Down Expand Up @@ -152,13 +150,7 @@ impl<'a> HummockVersionTransaction<'a> {
.entry(compaction_group_id)
.or_default()
.group_deltas;
let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
0,
0, // l0_sub_level_id will be generated during apply_version_delta
vec![], // default
inserted_table_infos,
0, // default
));
let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos);

group_deltas.push(group_delta);
}
Expand Down
1 change: 1 addition & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute("hummock.GroupTableChange", "#[derive(Eq)]")
.type_attribute("hummock.GroupMerge", "#[derive(Eq)]")
.type_attribute("hummock.GroupDelta", "#[derive(Eq)]")
.type_attribute("hummock.NewL0SubLevel", "#[derive(Eq)]")
.type_attribute("hummock.LevelHandler.RunningCompactTask", "#[derive(Eq)]")
.type_attribute("hummock.LevelHandler", "#[derive(Eq)]")
.type_attribute("hummock.TableOption", "#[derive(Eq)]")
Expand Down
Loading

0 comments on commit c0d6af1

Please sign in to comment.