Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): maintain sub_level id order #18915

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ message HummockVersion {
map<uint32, TableWatermarks> table_watermarks = 5;
map<uint32, TableChangeLog> table_change_logs = 6;
map<uint32, StateTableInfo> state_table_info = 7;
optional uint64 max_sub_level_id = 8;
}

message HummockVersionDelta {
Expand Down
21 changes: 20 additions & 1 deletion src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_hummock_sdk::version::{
};
use risingwave_hummock_sdk::{
CompactionGroupId, FrontendHummockVersionDelta, HummockEpoch, HummockVersionId,
FIRST_SUB_LEVEL_ID,
};
use risingwave_pb::hummock::{
CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas,
Expand Down Expand Up @@ -149,14 +150,32 @@ impl<'a> HummockVersionTransaction<'a> {

// Append SSTs to a new version.
for (compaction_group_id, inserted_table_infos) in commit_sstables {
let l0_sub_level_id = new_version_delta
.latest_version()
.levels
.get(&compaction_group_id)
.and_then(|levels| {
levels
.l0
.sub_levels
.last()
.map(|level| level.sub_level_id + 1)
})
.unwrap_or(
new_version_delta
.latest_version()
.max_sub_level_id
.map(|id| id + 1)
.unwrap_or(FIRST_SUB_LEVEL_ID),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be backward compatible, should we init the max_sub_level_id on meta startup to be the actual max sub-level id and we use the max_sub_level_id field only for the new sublevel's id?

Copy link
Contributor Author

@zwang28 zwang28 Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be backward compatible, should we init the max_sub_level_id on meta startup

This has been done in src/storage/hummock_sdk/src/version.rs

Copy link
Contributor Author

@zwang28 zwang28 Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore, reaching line 169 indicates that the version is indeed a fresh new one.

Alternatively we can make the in-mem HummockVersion:: max_sub_level_id u64 directly instead of Option<64>, in order to avoid such unwrap_or whenever reading max_sub_level_id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok for now, the newly introduced fields keep the Option restrictions.

When line 169 is reached, the sub_level will no longer exist in the compaciton group, so it is correct to use FIRST_SUB_LEVEL_ID for initialisation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use the max_sub_level_id field only for the new sublevel's id

True, it's a bug. Thanks.

);
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;
let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
0,
0, // l0_sub_level_id will be generated during apply_version_delta
l0_sub_level_id,
vec![], // default
inserted_table_infos,
0, // default
Expand Down
39 changes: 21 additions & 18 deletions src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::sync::Arc;

use bytes::Bytes;
use itertools::Either::{Left, Right};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VnodeBitmapExt;
Expand Down Expand Up @@ -434,10 +435,10 @@ impl HummockVersion {
continue;
}
match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
Ok(idx) => {
Left(idx) => {
add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
}
Err(idx) => {
Right(idx) => {
insert_new_sub_level(
target_l0,
sub_level.sub_level_id,
Expand Down Expand Up @@ -660,15 +661,10 @@ impl HummockVersion {
|| group_destroy.is_some(),
"no sst should be deleted when committing an epoch"
);
let mut next_l0_sub_level_id = levels
.l0
.sub_levels
.last()
.map(|level| level.sub_level_id + 1)
.unwrap_or(1);
for group_delta in &group_deltas.group_deltas {
if let GroupDelta::IntraLevel(IntraLevelDelta {
level_idx,
l0_sub_level_id,
inserted_table_infos,
..
}) = group_delta
Expand All @@ -680,12 +676,11 @@ impl HummockVersion {
if !inserted_table_infos.is_empty() {
insert_new_sub_level(
&mut levels.l0,
next_l0_sub_level_id,
*l0_sub_level_id,
PbLevelType::Overlapping,
inserted_table_infos.clone(),
None,
);
next_l0_sub_level_id += 1;
}
}
}
Expand Down Expand Up @@ -764,6 +759,14 @@ impl HummockVersion {
&version_delta.state_table_info_delta,
&changed_table_info,
);

// The max_sub_level_id may have been increased, e.g. during commit_epoch or merge_group.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should max with self.max_sub_level_id and otherwise it will be reset to 0 when all data are clean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks.

// Recalculate it.
self.max_sub_level_id = self
.levels
.values()
.filter_map(|levels| levels.l0.sub_levels.iter().map(|s| s.sub_level_id).max())
.max();
}

pub fn apply_change_log_delta<T: Clone>(
Expand Down Expand Up @@ -883,7 +886,7 @@ impl HummockVersion {
)
});

group_split::merge_levels(left_levels, right_levels);
group_split::merge_levels(left_levels, right_levels, self.max_sub_level_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's better to maintain a per-compaction group max_sub_level_id instead of a global one, and then we don't need to pass the global one here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to either option. @Li0k

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer global uniqueness, for me at the moment this sub level id has no more semantics. So globally unique is more concise.

}
}

Expand Down Expand Up @@ -952,10 +955,10 @@ impl HummockVersionCommon<SstableInfo> {
l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
});
match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
Ok(idx) => {
Left(idx) => {
add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
}
Err(idx) => {
Right(idx) => {
insert_new_sub_level(
target_l0,
sub_level.sub_level_id,
Expand Down Expand Up @@ -2032,7 +2035,7 @@ mod tests {
let mut left_levels = Levels::default();
let right_levels = Levels::default();

group_split::merge_levels(&mut left_levels, right_levels);
group_split::merge_levels(&mut left_levels, right_levels, None);
}

{
Expand All @@ -2046,7 +2049,7 @@ mod tests {
);
let right_levels = right_levels.clone();

group_split::merge_levels(&mut left_levels, right_levels);
group_split::merge_levels(&mut left_levels, right_levels, None);

assert!(left_levels.l0.sub_levels.len() == 3);
assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
Expand All @@ -2071,7 +2074,7 @@ mod tests {
},
);

group_split::merge_levels(&mut left_levels, right_levels);
group_split::merge_levels(&mut left_levels, right_levels, None);

assert!(left_levels.l0.sub_levels.len() == 3);
assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
Expand All @@ -2089,7 +2092,7 @@ mod tests {
let mut left_levels = left_levels.clone();
let right_levels = right_levels.clone();

group_split::merge_levels(&mut left_levels, right_levels);
group_split::merge_levels(&mut left_levels, right_levels, None);

assert!(left_levels.l0.sub_levels.len() == 6);
assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
Expand Down Expand Up @@ -2459,7 +2462,7 @@ mod tests {
..Default::default()
};

merge_levels(cg1, right_levels);
merge_levels(cg1, right_levels, None);
}

{
Expand Down
36 changes: 23 additions & 13 deletions src/storage/hummock_sdk/src/compaction_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub mod group_split {
use std::collections::BTreeSet;

use bytes::Bytes;
use itertools::Either;
use itertools::Either::{Left, Right};
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_pb::hummock::PbLevelType;
Expand All @@ -59,7 +61,7 @@ pub mod group_split {
use crate::key_range::KeyRange;
use crate::level::{Level, Levels};
use crate::sstable_info::SstableInfo;
use crate::{can_concat, HummockEpoch, KeyComparator};
use crate::{can_concat, HummockEpoch, KeyComparator, FIRST_SUB_LEVEL_ID};

/// The split will follow the following rules:
/// 1. Ssts with `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.right`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`].
Expand Down Expand Up @@ -272,24 +274,32 @@ pub mod group_split {
}

/// Merge the right levels into the left levels.
pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels) {
pub fn merge_levels(
left_levels: &mut Levels,
right_levels: Levels,
max_sub_level_id: Option<u64>,
) {
let right_l0 = right_levels.l0;

let mut max_left_sub_level_id = left_levels
let mut next_right_sub_level_id = left_levels
.l0
.sub_levels
.iter()
.map(|sub_level| sub_level.sub_level_id + 1)
.max()
.unwrap_or(0); // If there are no sub levels, the max sub level id is 0.
let need_rewrite_right_sub_level_id = max_left_sub_level_id != 0;
.unwrap_or(
max_sub_level_id
.map(|v| v + 1)
.unwrap_or(FIRST_SUB_LEVEL_ID),
);
let need_rewrite_right_sub_level_id = !left_levels.l0.sub_levels.is_empty();

for mut right_sub_level in right_l0.sub_levels {
// Rewrtie the sub level id of right sub level to avoid conflict with left sub levels. (conflict level type)
// e.g. left sub levels: [0, 1, 2], right sub levels: [0, 1, 2], after rewrite, right sub levels: [3, 4, 5]
if need_rewrite_right_sub_level_id {
right_sub_level.sub_level_id = max_left_sub_level_id;
max_left_sub_level_id += 1;
right_sub_level.sub_level_id = next_right_sub_level_id;
next_right_sub_level_id += 1;
}

insert_new_sub_level(
Expand Down Expand Up @@ -354,26 +364,26 @@ pub mod group_split {
}
}

// When `insert_hint` is `Ok(idx)`, it means that the sub level `idx` in `target_l0`
// will extend these SSTs. When `insert_hint` is `Err(idx)`, it
// When `insert_hint` is `Left(idx)`, it means that the sub level `idx` in `target_l0`
// will extend these SSTs. When `insert_hint` is `Right(idx)`, it
// means that we will add a new sub level `idx` into `target_l0`.
pub fn get_sub_level_insert_hint(
target_levels: &Vec<Level>,
sub_level: &Level,
) -> Result<usize, usize> {
) -> Either<usize, usize> {
for (idx, other) in target_levels.iter().enumerate() {
match other.sub_level_id.cmp(&sub_level.sub_level_id) {
Ordering::Less => {}
Ordering::Equal => {
return Ok(idx);
return Left(idx);
}
Ordering::Greater => {
return Err(idx);
return Right(idx);
}
}
}

Err(target_levels.len())
Right(target_levels.len())
}

/// Split the SSTs in the level according to the split key.
Expand Down
2 changes: 2 additions & 0 deletions src/storage/hummock_sdk/src/frontend_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ impl FrontendHummockVersion {
})
.collect(),
state_table_info: self.state_table_info.to_protobuf(),
// max_sub_level_id is not expected to be used
max_sub_level_id: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl Sub for HummockVersionId {

pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0);
pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1);
pub const FIRST_SUB_LEVEL_ID: u64 = 1;
pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
pub const OBJECT_SUFFIX: &str = "data";
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_sdk/src/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl From<(&HummockVersion, &HashSet<CompactionGroupId>)> for IncompleteHummockV
})
.collect(),
state_table_info: version.state_table_info.clone(),
max_sub_level_id: version.max_sub_level_id,
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ pub struct HummockVersionCommon<T> {
pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
pub table_change_log: HashMap<TableId, TableChangeLogCommon<T>>,
pub state_table_info: HummockVersionStateTableInfo,
/// The maximum `sub_level_id` that has been recorded thus far, regardless of whether the `sub_level_id` currently exists.
pub max_sub_level_id: Option<u64>,
}

pub type HummockVersion = HummockVersionCommon<SstableInfo>;
Expand Down Expand Up @@ -312,6 +314,21 @@ where
state_table_info: HummockVersionStateTableInfo::from_protobuf(
&pb_version.state_table_info,
),
// The next_sub_level_id is expected to be None in Pb from previous kernel version.
// For backward compatibility, try to initialize next_sub_level_id with max(existing sub_level ids).
max_sub_level_id: pb_version.max_sub_level_id.or_else(|| {
pb_version
.levels
.values()
.filter_map(|levels| {
levels
.l0
.as_ref()
.map(|l0| l0.sub_levels.iter().map(|s| s.sub_level_id).max())
})
.flatten()
.max()
}),
}
}
}
Expand Down Expand Up @@ -341,6 +358,7 @@ where
.map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
.collect(),
state_table_info: version.state_table_info.to_protobuf(),
max_sub_level_id: version.max_sub_level_id,
}
}
}
Expand Down Expand Up @@ -371,6 +389,7 @@ where
.map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
.collect(),
state_table_info: version.state_table_info.to_protobuf(),
max_sub_level_id: version.max_sub_level_id,
}
}
}
Expand Down Expand Up @@ -433,6 +452,7 @@ impl HummockVersion {
table_watermarks: HashMap::new(),
table_change_log: HashMap::new(),
state_table_info: HummockVersionStateTableInfo::empty(),
max_sub_level_id: None,
};
for group_id in [
StaticCompactionGroupId::StateDefault as CompactionGroupId,
Expand Down
Loading