From 8d6c2af774049aceb72ac3363a0426455fc99f43 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 15 Oct 2024 12:39:14 +0800 Subject: [PATCH 01/12] Revert "fix(storage): fix sub level id for time travel (#18886)" This reverts commit 798896fc4eec986193b49a0443fa0452caf41b4f. --- src/meta/src/hummock/manager/transaction.rs | 14 +++++++++++++- .../src/compaction_group/hummock_version_ext.rs | 10 ++-------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 04d5e237a11df..b8e9335a161b6 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -149,6 +149,18 @@ impl<'a> HummockVersionTransaction<'a> { // Append SSTs to a new version. for (compaction_group_id, inserted_table_infos) in commit_sstables { + let l0_sub_level_id = new_version_delta + .latest_version() + .levels + .get(&compaction_group_id) + .and_then(|levels| { + levels + .l0 + .sub_levels + .last() + .map(|level| level.sub_level_id + 1) + }) + .unwrap_or(committed_epoch); let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) @@ -156,7 +168,7 @@ impl<'a> HummockVersionTransaction<'a> { .group_deltas; let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( 0, - 0, // l0_sub_level_id will be generated during apply_version_delta + l0_sub_level_id, vec![], // default inserted_table_infos, 0, // default diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 0ffdd15eca498..338c18aa23690 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -660,15 +660,10 @@ impl HummockVersion { || group_destroy.is_some(), "no sst should be deleted when committing an epoch" ); - let mut next_l0_sub_level_id = levels - .l0 - .sub_levels - .last() - .map(|level| level.sub_level_id + 1) - .unwrap_or(1); for group_delta in &group_deltas.group_deltas { if let GroupDelta::IntraLevel(IntraLevelDelta { level_idx, + l0_sub_level_id, inserted_table_infos, .. }) = group_delta @@ -680,12 +675,11 @@ impl HummockVersion { if !inserted_table_infos.is_empty() { insert_new_sub_level( &mut levels.l0, - next_l0_sub_level_id, + *l0_sub_level_id, PbLevelType::Overlapping, inserted_table_infos.clone(), None, ); - next_l0_sub_level_id += 1; } } } From 9b043493fca6ea8ee7da6b533a62ffbb70aab192 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 15 Oct 2024 12:48:16 +0800 Subject: [PATCH 02/12] minor refactor --- .../src/compaction_group/hummock_version_ext.rs | 9 +++++---- .../hummock_sdk/src/compaction_group/mod.rs | 14 ++++++++------ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 338c18aa23690..2c55911c4fc31 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -18,6 +18,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::sync::Arc; use bytes::Bytes; +use itertools::Either::{Left, Right}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; @@ -434,10 +435,10 @@ impl HummockVersion { continue; } match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) { - Ok(idx) => { + Left(idx) => { add_ssts_to_sub_level(target_l0, idx, insert_table_infos); } - Err(idx) => { + Right(idx) => { insert_new_sub_level( target_l0, sub_level.sub_level_id, @@ -946,10 +947,10 @@ impl HummockVersionCommon { l0.uncompressed_file_size -= sst_info.uncompressed_file_size; }); match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) { - Ok(idx) => { + Left(idx) => { add_ssts_to_sub_level(target_l0, idx, insert_table_infos); } - Err(idx) => { + Right(idx) => { insert_new_sub_level( target_l0, sub_level.sub_level_id, diff --git a/src/storage/hummock_sdk/src/compaction_group/mod.rs b/src/storage/hummock_sdk/src/compaction_group/mod.rs index 0bf3fcea1c4e7..6634d7279c1d6 100644 --- a/src/storage/hummock_sdk/src/compaction_group/mod.rs +++ b/src/storage/hummock_sdk/src/compaction_group/mod.rs @@ -49,6 +49,8 @@ pub mod group_split { use std::collections::BTreeSet; use bytes::Bytes; + use itertools::Either; + use itertools::Either::{Left, Right}; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_pb::hummock::PbLevelType; @@ -354,26 +356,26 @@ pub mod group_split { } } - // When `insert_hint` is `Ok(idx)`, it means that the sub level `idx` in `target_l0` - // will extend these SSTs. When `insert_hint` is `Err(idx)`, it + // When `insert_hint` is `Left(idx)`, it means that the sub level `idx` in `target_l0` + // will extend these SSTs. When `insert_hint` is `Right(idx)`, it // means that we will add a new sub level `idx` into `target_l0`. pub fn get_sub_level_insert_hint( target_levels: &Vec, sub_level: &Level, - ) -> Result { + ) -> Either { for (idx, other) in target_levels.iter().enumerate() { match other.sub_level_id.cmp(&sub_level.sub_level_id) { Ordering::Less => {} Ordering::Equal => { - return Ok(idx); + return Left(idx); } Ordering::Greater => { - return Err(idx); + return Right(idx); } } } - Err(target_levels.len()) + Right(target_levels.len()) } /// Split the SSTs in the level according to the split key. From 0620d946f6b883270aa45a72824570904e32f18a Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 15 Oct 2024 13:29:18 +0800 Subject: [PATCH 03/12] add max_sub_level_id field --- proto/hummock.proto | 1 + src/meta/src/hummock/manager/transaction.rs | 9 +++++++- .../compaction_group/hummock_version_ext.rs | 20 ++++++++++++----- .../hummock_sdk/src/compaction_group/mod.rs | 22 +++++++++++++------ .../hummock_sdk/src/frontend_version.rs | 2 ++ src/storage/hummock_sdk/src/lib.rs | 1 + src/storage/hummock_sdk/src/time_travel.rs | 1 + src/storage/hummock_sdk/src/version.rs | 19 ++++++++++++++++ 8 files changed, 61 insertions(+), 14 deletions(-) diff --git a/proto/hummock.proto b/proto/hummock.proto index 6d3d7b0d32d7d..7bc5bf0971d84 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -189,6 +189,7 @@ message HummockVersion { map table_watermarks = 5; map table_change_logs = 6; map state_table_info = 7; + optional uint64 max_sub_level_id = 8; } message HummockVersionDelta { diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index b8e9335a161b6..b4855c215d3a7 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -25,6 +25,7 @@ use risingwave_hummock_sdk::version::{ }; use risingwave_hummock_sdk::{ CompactionGroupId, FrontendHummockVersionDelta, HummockEpoch, HummockVersionId, + FIRST_SUB_LEVEL_ID, }; use risingwave_pb::hummock::{ CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas, @@ -160,7 +161,13 @@ impl<'a> HummockVersionTransaction<'a> { .last() .map(|level| level.sub_level_id + 1) }) - .unwrap_or(committed_epoch); + .unwrap_or( + new_version_delta + .latest_version() + .max_sub_level_id + .map(|id| id + 1) + .unwrap_or(FIRST_SUB_LEVEL_ID), + ); let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 2c55911c4fc31..41282cf8116dc 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -759,6 +759,14 @@ impl HummockVersion { &version_delta.state_table_info_delta, &changed_table_info, ); + + // The max_sub_level_id may have been increased, e.g. during commit_epoch or merge_group. + // Recalculate it. + self.max_sub_level_id = self + .levels + .values() + .filter_map(|levels| levels.l0.sub_levels.iter().map(|s| s.sub_level_id).max()) + .max(); } pub fn apply_change_log_delta( @@ -878,7 +886,7 @@ impl HummockVersion { ) }); - group_split::merge_levels(left_levels, right_levels); + group_split::merge_levels(left_levels, right_levels, self.max_sub_level_id); } } @@ -2027,7 +2035,7 @@ mod tests { let mut left_levels = Levels::default(); let right_levels = Levels::default(); - group_split::merge_levels(&mut left_levels, right_levels); + group_split::merge_levels(&mut left_levels, right_levels, None); } { @@ -2041,7 +2049,7 @@ mod tests { ); let right_levels = right_levels.clone(); - group_split::merge_levels(&mut left_levels, right_levels); + group_split::merge_levels(&mut left_levels, right_levels, None); assert!(left_levels.l0.sub_levels.len() == 3); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); @@ -2066,7 +2074,7 @@ mod tests { }, ); - group_split::merge_levels(&mut left_levels, right_levels); + group_split::merge_levels(&mut left_levels, right_levels, None); assert!(left_levels.l0.sub_levels.len() == 3); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); @@ -2084,7 +2092,7 @@ mod tests { let mut left_levels = left_levels.clone(); let right_levels = right_levels.clone(); - group_split::merge_levels(&mut left_levels, right_levels); + group_split::merge_levels(&mut left_levels, right_levels, None); assert!(left_levels.l0.sub_levels.len() == 6); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); @@ -2454,7 +2462,7 @@ mod tests { ..Default::default() }; - merge_levels(cg1, right_levels); + merge_levels(cg1, right_levels, None); } { diff --git a/src/storage/hummock_sdk/src/compaction_group/mod.rs b/src/storage/hummock_sdk/src/compaction_group/mod.rs index 6634d7279c1d6..0fedf1668597d 100644 --- a/src/storage/hummock_sdk/src/compaction_group/mod.rs +++ b/src/storage/hummock_sdk/src/compaction_group/mod.rs @@ -61,7 +61,7 @@ pub mod group_split { use crate::key_range::KeyRange; use crate::level::{Level, Levels}; use crate::sstable_info::SstableInfo; - use crate::{can_concat, HummockEpoch, KeyComparator}; + use crate::{can_concat, HummockEpoch, KeyComparator, FIRST_SUB_LEVEL_ID}; /// The split will follow the following rules: /// 1. Ssts with `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.right`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`]. @@ -274,24 +274,32 @@ pub mod group_split { } /// Merge the right levels into the left levels. - pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels) { + pub fn merge_levels( + left_levels: &mut Levels, + right_levels: Levels, + max_sub_level_id: Option, + ) { let right_l0 = right_levels.l0; - let mut max_left_sub_level_id = left_levels + let mut next_right_sub_level_id = left_levels .l0 .sub_levels .iter() .map(|sub_level| sub_level.sub_level_id + 1) .max() - .unwrap_or(0); // If there are no sub levels, the max sub level id is 0. - let need_rewrite_right_sub_level_id = max_left_sub_level_id != 0; + .unwrap_or( + max_sub_level_id + .map(|v| v + 1) + .unwrap_or(FIRST_SUB_LEVEL_ID), + ); + let need_rewrite_right_sub_level_id = !left_levels.l0.sub_levels.is_empty(); for mut right_sub_level in right_l0.sub_levels { // Rewrtie the sub level id of right sub level to avoid conflict with left sub levels. (conflict level type) // e.g. left sub levels: [0, 1, 2], right sub levels: [0, 1, 2], after rewrite, right sub levels: [3, 4, 5] if need_rewrite_right_sub_level_id { - right_sub_level.sub_level_id = max_left_sub_level_id; - max_left_sub_level_id += 1; + right_sub_level.sub_level_id = next_right_sub_level_id; + next_right_sub_level_id += 1; } insert_new_sub_level( diff --git a/src/storage/hummock_sdk/src/frontend_version.rs b/src/storage/hummock_sdk/src/frontend_version.rs index 58276c522bd54..5db5789ceb755 100644 --- a/src/storage/hummock_sdk/src/frontend_version.rs +++ b/src/storage/hummock_sdk/src/frontend_version.rs @@ -89,6 +89,8 @@ impl FrontendHummockVersion { }) .collect(), state_table_info: self.state_table_info.to_protobuf(), + // max_sub_level_id is not expected to be used + max_sub_level_id: None, } } diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 11bb64ba5a4f7..a29b2d0fc2e10 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -130,6 +130,7 @@ impl Sub for HummockVersionId { pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0); pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1); +pub const FIRST_SUB_LEVEL_ID: u64 = 1; pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56; pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56; pub const OBJECT_SUFFIX: &str = "data"; diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index 3a58a7daa760c..7413472ab010b 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -124,6 +124,7 @@ impl From<(&HummockVersion, &HashSet)> for IncompleteHummockV }) .collect(), state_table_info: version.state_table_info.clone(), + max_sub_level_id: version.max_sub_level_id, } } } diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index dbd927e3d724a..648e182d9f538 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -224,6 +224,7 @@ pub struct HummockVersionCommon { pub table_watermarks: HashMap>, pub table_change_log: HashMap>, pub state_table_info: HummockVersionStateTableInfo, + pub max_sub_level_id: Option, } pub type HummockVersion = HummockVersionCommon; @@ -312,6 +313,21 @@ where state_table_info: HummockVersionStateTableInfo::from_protobuf( &pb_version.state_table_info, ), + // The next_sub_level_id is expected to be None in Pb from previous kernel version. + // For backward compatibility, try to initialize next_sub_level_id with max(existing sub_level ids). + max_sub_level_id: pb_version.max_sub_level_id.or_else(|| { + pb_version + .levels + .values() + .filter_map(|levels| { + levels + .l0 + .as_ref() + .map(|l0| l0.sub_levels.iter().map(|s| s.sub_level_id).max()) + }) + .flatten() + .max() + }), } } } @@ -341,6 +357,7 @@ where .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf())) .collect(), state_table_info: version.state_table_info.to_protobuf(), + max_sub_level_id: version.max_sub_level_id, } } } @@ -371,6 +388,7 @@ where .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf())) .collect(), state_table_info: version.state_table_info.to_protobuf(), + max_sub_level_id: version.max_sub_level_id, } } } @@ -433,6 +451,7 @@ impl HummockVersion { table_watermarks: HashMap::new(), table_change_log: HashMap::new(), state_table_info: HummockVersionStateTableInfo::empty(), + max_sub_level_id: None, }; for group_id in [ StaticCompactionGroupId::StateDefault as CompactionGroupId, From c654974fa1c9c2df7092346bb90834c0a1fa0f57 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 15 Oct 2024 14:14:22 +0800 Subject: [PATCH 04/12] add comment --- src/storage/hummock_sdk/src/version.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 648e182d9f538..a269bf1c75718 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -224,6 +224,7 @@ pub struct HummockVersionCommon { pub table_watermarks: HashMap>, pub table_change_log: HashMap>, pub state_table_info: HummockVersionStateTableInfo, + /// The maximum sub_level_id that has been recorded thus far, regardless of whether the sub_level_id currently exists. pub max_sub_level_id: Option, } From b8bdf81c327487eb0e06abf309e3f1493a21a3d0 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 15 Oct 2024 14:39:31 +0800 Subject: [PATCH 05/12] fmt --- src/storage/hummock_sdk/src/version.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index a269bf1c75718..bb2661bb8182f 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -224,7 +224,7 @@ pub struct HummockVersionCommon { pub table_watermarks: HashMap>, pub table_change_log: HashMap>, pub state_table_info: HummockVersionStateTableInfo, - /// The maximum sub_level_id that has been recorded thus far, regardless of whether the sub_level_id currently exists. + /// The maximum `sub_level_id` that has been recorded thus far, regardless of whether the `sub_level_id` currently exists. pub max_sub_level_id: Option, } From d5ecedd229bda6fedb83bc8274f843f9481ce05c Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 15 Oct 2024 16:05:31 +0800 Subject: [PATCH 06/12] bugfix --- .../hummock_sdk/src/compaction_group/hummock_version_ext.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 41282cf8116dc..c73f446eef7bb 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -15,6 +15,7 @@ use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::iter; use std::sync::Arc; use bytes::Bytes; @@ -765,7 +766,9 @@ impl HummockVersion { self.max_sub_level_id = self .levels .values() - .filter_map(|levels| levels.l0.sub_levels.iter().map(|s| s.sub_level_id).max()) + .map(|levels| levels.l0.sub_levels.iter().map(|s| s.sub_level_id).max()) + .chain(iter::once(self.max_sub_level_id)) + .flatten() .max(); } From 24e46b8cb24c699c254af55a32f2cd3a1b70bfe7 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 15 Oct 2024 16:53:13 +0800 Subject: [PATCH 07/12] bugfix --- src/meta/src/hummock/manager/transaction.rs | 19 +++---------------- .../hummock_sdk/src/compaction_group/mod.rs | 14 +++----------- 2 files changed, 6 insertions(+), 27 deletions(-) diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index b4855c215d3a7..167812a241d9b 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -152,22 +152,9 @@ impl<'a> HummockVersionTransaction<'a> { for (compaction_group_id, inserted_table_infos) in commit_sstables { let l0_sub_level_id = new_version_delta .latest_version() - .levels - .get(&compaction_group_id) - .and_then(|levels| { - levels - .l0 - .sub_levels - .last() - .map(|level| level.sub_level_id + 1) - }) - .unwrap_or( - new_version_delta - .latest_version() - .max_sub_level_id - .map(|id| id + 1) - .unwrap_or(FIRST_SUB_LEVEL_ID), - ); + .max_sub_level_id + .map(|id| id + 1) + .unwrap_or(FIRST_SUB_LEVEL_ID); let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) diff --git a/src/storage/hummock_sdk/src/compaction_group/mod.rs b/src/storage/hummock_sdk/src/compaction_group/mod.rs index 0fedf1668597d..93a0f524505c5 100644 --- a/src/storage/hummock_sdk/src/compaction_group/mod.rs +++ b/src/storage/hummock_sdk/src/compaction_group/mod.rs @@ -281,17 +281,9 @@ pub mod group_split { ) { let right_l0 = right_levels.l0; - let mut next_right_sub_level_id = left_levels - .l0 - .sub_levels - .iter() - .map(|sub_level| sub_level.sub_level_id + 1) - .max() - .unwrap_or( - max_sub_level_id - .map(|v| v + 1) - .unwrap_or(FIRST_SUB_LEVEL_ID), - ); + let mut next_right_sub_level_id = max_sub_level_id + .map(|v| v + 1) + .unwrap_or(FIRST_SUB_LEVEL_ID); let need_rewrite_right_sub_level_id = !left_levels.l0.sub_levels.is_empty(); for mut right_sub_level in right_l0.sub_levels { From 151c84de275ce07de54b6a056a93ad809823b023 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 15 Oct 2024 17:23:16 +0800 Subject: [PATCH 08/12] bugfix --- src/meta/src/hummock/manager/transaction.rs | 3 +++ .../compaction_group/hummock_version_ext.rs | 23 +++++++++++-------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 167812a241d9b..726a061e0214a 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -150,6 +150,9 @@ impl<'a> HummockVersionTransaction<'a> { // Append SSTs to a new version. for (compaction_group_id, inserted_table_infos) in commit_sstables { + // No need to call may_bump_max_sub_level_id, because + // - There's at most one IntraLevel for each compaction group in one delta. + // - The pre_apply is called immediately, which calls may_bump_max_sub_level_id once. let l0_sub_level_id = new_version_delta .latest_version() .max_sub_level_id diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index c73f446eef7bb..a7b5729ae6a13 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -761,15 +761,7 @@ impl HummockVersion { &changed_table_info, ); - // The max_sub_level_id may have been increased, e.g. during commit_epoch or merge_group. - // Recalculate it. - self.max_sub_level_id = self - .levels - .values() - .map(|levels| levels.l0.sub_levels.iter().map(|s| s.sub_level_id).max()) - .chain(iter::once(self.max_sub_level_id)) - .flatten() - .max(); + self.may_bump_max_sub_level_id(); } pub fn apply_change_log_delta( @@ -890,6 +882,19 @@ impl HummockVersion { }); group_split::merge_levels(left_levels, right_levels, self.max_sub_level_id); + // Need to call may_bump_max_sub_level_id, because multiple merge may be called in one delta. + self.may_bump_max_sub_level_id(); + } + + fn may_bump_max_sub_level_id(&mut self) { + // The max_sub_level_id may have been increased, recalculate it. + self.max_sub_level_id = self + .levels + .values() + .map(|levels| levels.l0.sub_levels.iter().map(|s| s.sub_level_id).max()) + .chain(iter::once(self.max_sub_level_id)) + .flatten() + .max(); } } From 74d130ea6ef34e315345167ee513e509db15fc88 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 15 Oct 2024 17:39:38 +0800 Subject: [PATCH 09/12] fix test --- .../hummock_sdk/src/compaction_group/hummock_version_ext.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index a7b5729ae6a13..8e086503bda9a 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -2263,6 +2263,7 @@ mod tests { }, ), )]), + max_sub_level_id: Some(101), ..Default::default() }; @@ -2458,7 +2459,7 @@ mod tests { right_l0.sub_levels.push(Level { level_idx: 0, table_infos: x, - sub_level_id: 101, + sub_level_id: 100, total_file_size: 100, level_type: LevelType::Overlapping, ..Default::default() @@ -2470,7 +2471,7 @@ mod tests { ..Default::default() }; - merge_levels(cg1, right_levels, None); + merge_levels(cg1, right_levels, version.max_sub_level_id); } { From d0ba96cf5738ee58f4206ee445d8a7865d2773a4 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 15 Oct 2024 18:35:30 +0800 Subject: [PATCH 10/12] refactor --- src/meta/src/hummock/manager/transaction.rs | 7 ++++--- .../src/compaction_group/hummock_version_ext.rs | 6 ++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 726a061e0214a..0470103f5ed49 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -150,9 +150,10 @@ impl<'a> HummockVersionTransaction<'a> { // Append SSTs to a new version. for (compaction_group_id, inserted_table_infos) in commit_sstables { - // No need to call may_bump_max_sub_level_id, because - // - There's at most one IntraLevel for each compaction group in one delta. - // - The pre_apply is called immediately, which calls may_bump_max_sub_level_id once. + // There is no necessity to invoke may_bump_max_sub_level_id here, because + // - There's at most one IntraLevel delta for each compaction group in one pre_commit_epoch's delta. + // - No other delta type will be present. + // - may_bump_max_sub_level_id will be invoked once later for this delta in apply_version_delta. let l0_sub_level_id = new_version_delta .latest_version() .max_sub_level_id diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 8e086503bda9a..0469b56f48382 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -882,8 +882,10 @@ impl HummockVersion { }); group_split::merge_levels(left_levels, right_levels, self.max_sub_level_id); - // Need to call may_bump_max_sub_level_id, because multiple merge may be called in one delta. - self.may_bump_max_sub_level_id(); + // There is no necessity to invoke may_bump_max_sub_level_id here, because + // - There's at most one GroupMerge delta in one delta. + // - No other delta type will be present. + // - may_bump_max_sub_level_id will be invoked once later for this delta in apply_version_delta. } fn may_bump_max_sub_level_id(&mut self) { From d8ead5ac37c6313eda667b7005361518e421793b Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 15 Oct 2024 18:39:33 +0800 Subject: [PATCH 11/12] fix tests --- .../hummock_sdk/src/compaction_group/hummock_version_ext.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 0469b56f48382..984643974f687 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -2084,7 +2084,7 @@ mod tests { }, ); - group_split::merge_levels(&mut left_levels, right_levels, None); + group_split::merge_levels(&mut left_levels, right_levels, Some(105)); assert!(left_levels.l0.sub_levels.len() == 3); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); @@ -2102,7 +2102,7 @@ mod tests { let mut left_levels = left_levels.clone(); let right_levels = right_levels.clone(); - group_split::merge_levels(&mut left_levels, right_levels, None); + group_split::merge_levels(&mut left_levels, right_levels, Some(105)); assert!(left_levels.l0.sub_levels.len() == 6); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); From 09d5b6b721c009d40aca0fb43546b02daa4dfd54 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Thu, 17 Oct 2024 20:00:04 +0800 Subject: [PATCH 12/12] refactor --- src/meta/src/hummock/manager/mod.rs | 17 +++++++++++- src/meta/src/hummock/manager/transaction.rs | 7 +---- .../compaction_group/hummock_version_ext.rs | 16 +++++------ .../hummock_sdk/src/compaction_group/mod.rs | 27 ++++++++++++------- src/storage/hummock_sdk/src/lib.rs | 2 +- src/storage/hummock_sdk/src/version.rs | 27 +++++++------------ 6 files changed, 52 insertions(+), 44 deletions(-) diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 504de130df075..04ab2ee58f650 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::iter; use std::ops::{Deref, DerefMut}; use std::sync::atomic::AtomicBool; use std::sync::Arc; @@ -24,7 +25,7 @@ use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ version_archive_dir, version_checkpoint_path, CompactionGroupId, HummockCompactionTaskId, - HummockContextId, HummockVersionId, + HummockContextId, HummockVersionId, INVALID_MAX_SUB_LEVEL_ID, }; use risingwave_meta_model_v2::{ compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta, @@ -68,6 +69,7 @@ mod worker; pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo}; use compaction::*; pub use compaction::{check_cg_write_limit, WriteLimitType}; +use risingwave_common::util::epoch::Epoch; pub(crate) use utils::*; // Update to states are performed as follow: @@ -434,6 +436,19 @@ impl HummockManager { ..Default::default() }); + let may_init_max_sub_level_id = if redo_state.max_sub_level_id == INVALID_MAX_SUB_LEVEL_ID { + Epoch::now().0 + } else { + redo_state.max_sub_level_id + }; + redo_state.max_sub_level_id = redo_state + .levels + .values() + .filter_map(|levels| levels.l0.sub_levels.iter().map(|s| s.sub_level_id).max()) + .chain(iter::once(may_init_max_sub_level_id)) + .max() + .unwrap(); + versioning_guard.current_version = redo_state; versioning_guard.hummock_version_deltas = hummock_version_deltas; diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 0470103f5ed49..8812d600b69f1 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -25,7 +25,6 @@ use risingwave_hummock_sdk::version::{ }; use risingwave_hummock_sdk::{ CompactionGroupId, FrontendHummockVersionDelta, HummockEpoch, HummockVersionId, - FIRST_SUB_LEVEL_ID, }; use risingwave_pb::hummock::{ CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas, @@ -154,11 +153,7 @@ impl<'a> HummockVersionTransaction<'a> { // - There's at most one IntraLevel delta for each compaction group in one pre_commit_epoch's delta. // - No other delta type will be present. // - may_bump_max_sub_level_id will be invoked once later for this delta in apply_version_delta. - let l0_sub_level_id = new_version_delta - .latest_version() - .max_sub_level_id - .map(|id| id + 1) - .unwrap_or(FIRST_SUB_LEVEL_ID); + let l0_sub_level_id = new_version_delta.latest_version().max_sub_level_id + 1; let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 984643974f687..2180d0cd0a039 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -893,10 +893,10 @@ impl HummockVersion { self.max_sub_level_id = self .levels .values() - .map(|levels| levels.l0.sub_levels.iter().map(|s| s.sub_level_id).max()) + .filter_map(|levels| levels.l0.sub_levels.iter().map(|s| s.sub_level_id).max()) .chain(iter::once(self.max_sub_level_id)) - .flatten() - .max(); + .max() + .unwrap(); } } @@ -2045,7 +2045,7 @@ mod tests { let mut left_levels = Levels::default(); let right_levels = Levels::default(); - group_split::merge_levels(&mut left_levels, right_levels, None); + group_split::merge_levels(&mut left_levels, right_levels, 105); } { @@ -2059,7 +2059,7 @@ mod tests { ); let right_levels = right_levels.clone(); - group_split::merge_levels(&mut left_levels, right_levels, None); + group_split::merge_levels(&mut left_levels, right_levels, 105); assert!(left_levels.l0.sub_levels.len() == 3); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); @@ -2084,7 +2084,7 @@ mod tests { }, ); - group_split::merge_levels(&mut left_levels, right_levels, Some(105)); + group_split::merge_levels(&mut left_levels, right_levels, 105); assert!(left_levels.l0.sub_levels.len() == 3); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); @@ -2102,7 +2102,7 @@ mod tests { let mut left_levels = left_levels.clone(); let right_levels = right_levels.clone(); - group_split::merge_levels(&mut left_levels, right_levels, Some(105)); + group_split::merge_levels(&mut left_levels, right_levels, 105); assert!(left_levels.l0.sub_levels.len() == 6); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); @@ -2265,7 +2265,7 @@ mod tests { }, ), )]), - max_sub_level_id: Some(101), + max_sub_level_id: 101, ..Default::default() }; diff --git a/src/storage/hummock_sdk/src/compaction_group/mod.rs b/src/storage/hummock_sdk/src/compaction_group/mod.rs index 93a0f524505c5..4b6637a957d80 100644 --- a/src/storage/hummock_sdk/src/compaction_group/mod.rs +++ b/src/storage/hummock_sdk/src/compaction_group/mod.rs @@ -61,7 +61,7 @@ pub mod group_split { use crate::key_range::KeyRange; use crate::level::{Level, Levels}; use crate::sstable_info::SstableInfo; - use crate::{can_concat, HummockEpoch, KeyComparator, FIRST_SUB_LEVEL_ID}; + use crate::{can_concat, HummockEpoch, KeyComparator}; /// The split will follow the following rules: /// 1. Ssts with `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.right`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`]. @@ -274,16 +274,23 @@ pub mod group_split { } /// Merge the right levels into the left levels. - pub fn merge_levels( - left_levels: &mut Levels, - right_levels: Levels, - max_sub_level_id: Option, - ) { + pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels, max_sub_level_id: u64) { let right_l0 = right_levels.l0; - - let mut next_right_sub_level_id = max_sub_level_id - .map(|v| v + 1) - .unwrap_or(FIRST_SUB_LEVEL_ID); + let mut next_right_sub_level_id = max_sub_level_id + 1; + let max_left_sub_level_id = left_levels + .l0 + .sub_levels + .iter() + .map(|l| l.sub_level_id) + .max(); + assert!( + max_left_sub_level_id + .map(|left| left < next_right_sub_level_id) + .unwrap_or(true), + "max_left_sub_level_id={:?} next_right_sub_level_id={}", + max_left_sub_level_id, + next_right_sub_level_id + ); let need_rewrite_right_sub_level_id = !left_levels.l0.sub_levels.is_empty(); for mut right_sub_level in right_l0.sub_levels { diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index a29b2d0fc2e10..ce7ccef9675d0 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -130,7 +130,7 @@ impl Sub for HummockVersionId { pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0); pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1); -pub const FIRST_SUB_LEVEL_ID: u64 = 1; +pub const INVALID_MAX_SUB_LEVEL_ID: u64 = 0; pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56; pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56; pub const OBJECT_SUFFIX: &str = "data"; diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index bb2661bb8182f..a9115c4c475ec 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -38,6 +38,7 @@ use crate::sstable_info::SstableInfo; use crate::table_watermark::TableWatermarks; use crate::{ CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, FIRST_VERSION_ID, + INVALID_MAX_SUB_LEVEL_ID, }; #[derive(Debug, Clone, PartialEq)] @@ -225,7 +226,7 @@ pub struct HummockVersionCommon { pub table_change_log: HashMap>, pub state_table_info: HummockVersionStateTableInfo, /// The maximum `sub_level_id` that has been recorded thus far, regardless of whether the `sub_level_id` currently exists. - pub max_sub_level_id: Option, + pub max_sub_level_id: u64, } pub type HummockVersion = HummockVersionCommon; @@ -315,20 +316,10 @@ where &pb_version.state_table_info, ), // The next_sub_level_id is expected to be None in Pb from previous kernel version. - // For backward compatibility, try to initialize next_sub_level_id with max(existing sub_level ids). - max_sub_level_id: pb_version.max_sub_level_id.or_else(|| { - pb_version - .levels - .values() - .filter_map(|levels| { - levels - .l0 - .as_ref() - .map(|l0| l0.sub_levels.iter().map(|s| s.sub_level_id).max()) - }) - .flatten() - .max() - }), + // For backward compatibility, see load_meta_store_state_impl. + max_sub_level_id: pb_version + .max_sub_level_id + .unwrap_or(INVALID_MAX_SUB_LEVEL_ID), } } } @@ -358,7 +349,7 @@ where .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf())) .collect(), state_table_info: version.state_table_info.to_protobuf(), - max_sub_level_id: version.max_sub_level_id, + max_sub_level_id: Some(version.max_sub_level_id), } } } @@ -389,7 +380,7 @@ where .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf())) .collect(), state_table_info: version.state_table_info.to_protobuf(), - max_sub_level_id: version.max_sub_level_id, + max_sub_level_id: Some(version.max_sub_level_id), } } } @@ -452,7 +443,7 @@ impl HummockVersion { table_watermarks: HashMap::new(), table_change_log: HashMap::new(), state_table_info: HummockVersionStateTableInfo::empty(), - max_sub_level_id: None, + max_sub_level_id: INVALID_MAX_SUB_LEVEL_ID, }; for group_id in [ StaticCompactionGroupId::StateDefault as CompactionGroupId,