Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): maintain sub_level id order #18915

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ message HummockVersion {
map<uint32, TableWatermarks> table_watermarks = 5;
map<uint32, TableChangeLog> table_change_logs = 6;
map<uint32, StateTableInfo> state_table_info = 7;
optional uint64 max_sub_level_id = 8;
}

message HummockVersionDelta {
Expand Down
17 changes: 16 additions & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap, VecDeque};
use std::iter;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
Expand All @@ -24,7 +25,7 @@ use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{
version_archive_dir, version_checkpoint_path, CompactionGroupId, HummockCompactionTaskId,
HummockContextId, HummockVersionId,
HummockContextId, HummockVersionId, INVALID_MAX_SUB_LEVEL_ID,
};
use risingwave_meta_model_v2::{
compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
Expand Down Expand Up @@ -68,6 +69,7 @@ mod worker;
pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
use compaction::*;
pub use compaction::{check_cg_write_limit, WriteLimitType};
use risingwave_common::util::epoch::Epoch;
pub(crate) use utils::*;

// Update to states are performed as follow:
Expand Down Expand Up @@ -434,6 +436,19 @@ impl HummockManager {
..Default::default()
});

let may_init_max_sub_level_id = if redo_state.max_sub_level_id == INVALID_MAX_SUB_LEVEL_ID {
Epoch::now().0
} else {
redo_state.max_sub_level_id
};
redo_state.max_sub_level_id = redo_state
.levels
.values()
.filter_map(|levels| levels.l0.sub_levels.iter().map(|s| s.sub_level_id).max())
.chain(iter::once(may_init_max_sub_level_id))
.max()
.unwrap();

versioning_guard.current_version = redo_state;
versioning_guard.hummock_version_deltas = hummock_version_deltas;

Expand Down
7 changes: 6 additions & 1 deletion src/meta/src/hummock/manager/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,19 @@ impl<'a> HummockVersionTransaction<'a> {

// Append SSTs to a new version.
for (compaction_group_id, inserted_table_infos) in commit_sstables {
// There is no necessity to invoke may_bump_max_sub_level_id here, because
// - There's at most one IntraLevel delta for each compaction group in one pre_commit_epoch's delta.
// - No other delta type will be present.
// - may_bump_max_sub_level_id will be invoked once later for this delta in apply_version_delta.
let l0_sub_level_id = new_version_delta.latest_version().max_sub_level_id + 1;
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;
let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
0,
0, // l0_sub_level_id will be generated during apply_version_delta
l0_sub_level_id,
vec![], // default
inserted_table_infos,
0, // default
Expand Down
52 changes: 33 additions & 19 deletions src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::iter;
use std::sync::Arc;

use bytes::Bytes;
use itertools::Either::{Left, Right};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VnodeBitmapExt;
Expand Down Expand Up @@ -434,10 +436,10 @@ impl HummockVersion {
continue;
}
match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
Ok(idx) => {
Left(idx) => {
add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
}
Err(idx) => {
Right(idx) => {
insert_new_sub_level(
target_l0,
sub_level.sub_level_id,
Expand Down Expand Up @@ -660,15 +662,10 @@ impl HummockVersion {
|| group_destroy.is_some(),
"no sst should be deleted when committing an epoch"
);
let mut next_l0_sub_level_id = levels
.l0
.sub_levels
.last()
.map(|level| level.sub_level_id + 1)
.unwrap_or(1);
for group_delta in &group_deltas.group_deltas {
if let GroupDelta::IntraLevel(IntraLevelDelta {
level_idx,
l0_sub_level_id,
inserted_table_infos,
..
}) = group_delta
Expand All @@ -680,12 +677,11 @@ impl HummockVersion {
if !inserted_table_infos.is_empty() {
insert_new_sub_level(
&mut levels.l0,
next_l0_sub_level_id,
*l0_sub_level_id,
PbLevelType::Overlapping,
inserted_table_infos.clone(),
None,
);
next_l0_sub_level_id += 1;
}
}
}
Expand Down Expand Up @@ -764,6 +760,8 @@ impl HummockVersion {
&version_delta.state_table_info_delta,
&changed_table_info,
);

self.may_bump_max_sub_level_id();
}

pub fn apply_change_log_delta<T: Clone>(
Expand Down Expand Up @@ -883,7 +881,22 @@ impl HummockVersion {
)
});

group_split::merge_levels(left_levels, right_levels);
group_split::merge_levels(left_levels, right_levels, self.max_sub_level_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's better to maintain a per-compaction group max_sub_level_id instead of a global one, and then we don't need to pass the global one here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to either option. @Li0k

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer global uniqueness, for me at the moment this sub level id has no more semantics. So globally unique is more concise.

// There is no necessity to invoke may_bump_max_sub_level_id here, because
// - There's at most one GroupMerge delta in one delta.
// - No other delta type will be present.
// - may_bump_max_sub_level_id will be invoked once later for this delta in apply_version_delta.
}

fn may_bump_max_sub_level_id(&mut self) {
// The max_sub_level_id may have been increased, recalculate it.
self.max_sub_level_id = self
.levels
.values()
.filter_map(|levels| levels.l0.sub_levels.iter().map(|s| s.sub_level_id).max())
.chain(iter::once(self.max_sub_level_id))
.max()
.unwrap();
}
}

Expand Down Expand Up @@ -952,10 +965,10 @@ impl HummockVersionCommon<SstableInfo> {
l0.uncompressed_file_size -= sst_info.uncompressed_file_size;
});
match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
Ok(idx) => {
Left(idx) => {
add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
}
Err(idx) => {
Right(idx) => {
insert_new_sub_level(
target_l0,
sub_level.sub_level_id,
Expand Down Expand Up @@ -2032,7 +2045,7 @@ mod tests {
let mut left_levels = Levels::default();
let right_levels = Levels::default();

group_split::merge_levels(&mut left_levels, right_levels);
group_split::merge_levels(&mut left_levels, right_levels, 105);
}

{
Expand All @@ -2046,7 +2059,7 @@ mod tests {
);
let right_levels = right_levels.clone();

group_split::merge_levels(&mut left_levels, right_levels);
group_split::merge_levels(&mut left_levels, right_levels, 105);

assert!(left_levels.l0.sub_levels.len() == 3);
assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
Expand All @@ -2071,7 +2084,7 @@ mod tests {
},
);

group_split::merge_levels(&mut left_levels, right_levels);
group_split::merge_levels(&mut left_levels, right_levels, 105);

assert!(left_levels.l0.sub_levels.len() == 3);
assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
Expand All @@ -2089,7 +2102,7 @@ mod tests {
let mut left_levels = left_levels.clone();
let right_levels = right_levels.clone();

group_split::merge_levels(&mut left_levels, right_levels);
group_split::merge_levels(&mut left_levels, right_levels, 105);

assert!(left_levels.l0.sub_levels.len() == 6);
assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
Expand Down Expand Up @@ -2252,6 +2265,7 @@ mod tests {
},
),
)]),
max_sub_level_id: 101,
..Default::default()
};

Expand Down Expand Up @@ -2447,7 +2461,7 @@ mod tests {
right_l0.sub_levels.push(Level {
level_idx: 0,
table_infos: x,
sub_level_id: 101,
sub_level_id: 100,
total_file_size: 100,
level_type: LevelType::Overlapping,
..Default::default()
Expand All @@ -2459,7 +2473,7 @@ mod tests {
..Default::default()
};

merge_levels(cg1, right_levels);
merge_levels(cg1, right_levels, version.max_sub_level_id);
}

{
Expand Down
39 changes: 24 additions & 15 deletions src/storage/hummock_sdk/src/compaction_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub mod group_split {
use std::collections::BTreeSet;

use bytes::Bytes;
use itertools::Either;
use itertools::Either::{Left, Right};
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_pb::hummock::PbLevelType;
Expand Down Expand Up @@ -272,24 +274,31 @@ pub mod group_split {
}

/// Merge the right levels into the left levels.
pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels) {
pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels, max_sub_level_id: u64) {
let right_l0 = right_levels.l0;

let mut max_left_sub_level_id = left_levels
let mut next_right_sub_level_id = max_sub_level_id + 1;
let max_left_sub_level_id = left_levels
.l0
.sub_levels
.iter()
.map(|sub_level| sub_level.sub_level_id + 1)
.max()
.unwrap_or(0); // If there are no sub levels, the max sub level id is 0.
let need_rewrite_right_sub_level_id = max_left_sub_level_id != 0;
.map(|l| l.sub_level_id)
.max();
assert!(
max_left_sub_level_id
.map(|left| left < next_right_sub_level_id)
.unwrap_or(true),
"max_left_sub_level_id={:?} next_right_sub_level_id={}",
max_left_sub_level_id,
next_right_sub_level_id
);
let need_rewrite_right_sub_level_id = !left_levels.l0.sub_levels.is_empty();

for mut right_sub_level in right_l0.sub_levels {
// Rewrtie the sub level id of right sub level to avoid conflict with left sub levels. (conflict level type)
// e.g. left sub levels: [0, 1, 2], right sub levels: [0, 1, 2], after rewrite, right sub levels: [3, 4, 5]
if need_rewrite_right_sub_level_id {
right_sub_level.sub_level_id = max_left_sub_level_id;
max_left_sub_level_id += 1;
right_sub_level.sub_level_id = next_right_sub_level_id;
next_right_sub_level_id += 1;
}

insert_new_sub_level(
Expand Down Expand Up @@ -354,26 +363,26 @@ pub mod group_split {
}
}

// When `insert_hint` is `Ok(idx)`, it means that the sub level `idx` in `target_l0`
// will extend these SSTs. When `insert_hint` is `Err(idx)`, it
// When `insert_hint` is `Left(idx)`, it means that the sub level `idx` in `target_l0`
// will extend these SSTs. When `insert_hint` is `Right(idx)`, it
// means that we will add a new sub level `idx` into `target_l0`.
pub fn get_sub_level_insert_hint(
target_levels: &Vec<Level>,
sub_level: &Level,
) -> Result<usize, usize> {
) -> Either<usize, usize> {
for (idx, other) in target_levels.iter().enumerate() {
match other.sub_level_id.cmp(&sub_level.sub_level_id) {
Ordering::Less => {}
Ordering::Equal => {
return Ok(idx);
return Left(idx);
}
Ordering::Greater => {
return Err(idx);
return Right(idx);
}
}
}

Err(target_levels.len())
Right(target_levels.len())
}

/// Split the SSTs in the level according to the split key.
Expand Down
2 changes: 2 additions & 0 deletions src/storage/hummock_sdk/src/frontend_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ impl FrontendHummockVersion {
})
.collect(),
state_table_info: self.state_table_info.to_protobuf(),
// max_sub_level_id is not expected to be used
max_sub_level_id: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl Sub for HummockVersionId {

pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0);
pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1);
pub const INVALID_MAX_SUB_LEVEL_ID: u64 = 0;
pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56;
pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56;
pub const OBJECT_SUFFIX: &str = "data";
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_sdk/src/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl From<(&HummockVersion, &HashSet<CompactionGroupId>)> for IncompleteHummockV
})
.collect(),
state_table_info: version.state_table_info.clone(),
max_sub_level_id: version.max_sub_level_id,
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::sstable_info::SstableInfo;
use crate::table_watermark::TableWatermarks;
use crate::{
CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, FIRST_VERSION_ID,
INVALID_MAX_SUB_LEVEL_ID,
};

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -224,6 +225,8 @@ pub struct HummockVersionCommon<T> {
pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
pub table_change_log: HashMap<TableId, TableChangeLogCommon<T>>,
pub state_table_info: HummockVersionStateTableInfo,
/// The maximum `sub_level_id` that has been recorded thus far, regardless of whether the `sub_level_id` currently exists.
pub max_sub_level_id: u64,
}

pub type HummockVersion = HummockVersionCommon<SstableInfo>;
Expand Down Expand Up @@ -312,6 +315,11 @@ where
state_table_info: HummockVersionStateTableInfo::from_protobuf(
&pb_version.state_table_info,
),
// The next_sub_level_id is expected to be None in Pb from previous kernel version.
// For backward compatibility, see load_meta_store_state_impl.
max_sub_level_id: pb_version
.max_sub_level_id
.unwrap_or(INVALID_MAX_SUB_LEVEL_ID),
}
}
}
Expand Down Expand Up @@ -341,6 +349,7 @@ where
.map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
.collect(),
state_table_info: version.state_table_info.to_protobuf(),
max_sub_level_id: Some(version.max_sub_level_id),
}
}
}
Expand Down Expand Up @@ -371,6 +380,7 @@ where
.map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
.collect(),
state_table_info: version.state_table_info.to_protobuf(),
max_sub_level_id: Some(version.max_sub_level_id),
}
}
}
Expand Down Expand Up @@ -433,6 +443,7 @@ impl HummockVersion {
table_watermarks: HashMap::new(),
table_change_log: HashMap::new(),
state_table_info: HummockVersionStateTableInfo::empty(),
max_sub_level_id: INVALID_MAX_SUB_LEVEL_ID,
};
for group_id in [
StaticCompactionGroupId::StateDefault as CompactionGroupId,
Expand Down
Loading