diff --git a/proto/hummock.proto b/proto/hummock.proto index 6d3d7b0d32d7d..7bc5bf0971d84 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -189,6 +189,7 @@ message HummockVersion { map table_watermarks = 5; map table_change_logs = 6; map state_table_info = 7; + optional uint64 max_sub_level_id = 8; } message HummockVersionDelta { diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 504de130df075..04ab2ee58f650 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::iter; use std::ops::{Deref, DerefMut}; use std::sync::atomic::AtomicBool; use std::sync::Arc; @@ -24,7 +25,7 @@ use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ version_archive_dir, version_checkpoint_path, CompactionGroupId, HummockCompactionTaskId, - HummockContextId, HummockVersionId, + HummockContextId, HummockVersionId, INVALID_MAX_SUB_LEVEL_ID, }; use risingwave_meta_model_v2::{ compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta, @@ -68,6 +69,7 @@ mod worker; pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo}; use compaction::*; pub use compaction::{check_cg_write_limit, WriteLimitType}; +use risingwave_common::util::epoch::Epoch; pub(crate) use utils::*; // Update to states are performed as follow: @@ -434,6 +436,19 @@ impl HummockManager { ..Default::default() }); + let may_init_max_sub_level_id = if redo_state.max_sub_level_id == INVALID_MAX_SUB_LEVEL_ID { + Epoch::now().0 + } else { + redo_state.max_sub_level_id + }; + redo_state.max_sub_level_id = redo_state + .levels + .values() + .filter_map(|levels| levels.l0.sub_levels.iter().map(|s| s.sub_level_id).max()) + .chain(iter::once(may_init_max_sub_level_id)) + .max() + .unwrap(); + versioning_guard.current_version = redo_state; versioning_guard.hummock_version_deltas = hummock_version_deltas; diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 04d5e237a11df..8812d600b69f1 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -149,6 +149,11 @@ impl<'a> HummockVersionTransaction<'a> { // Append SSTs to a new version. for (compaction_group_id, inserted_table_infos) in commit_sstables { + // There is no necessity to invoke may_bump_max_sub_level_id here, because + // - There's at most one IntraLevel delta for each compaction group in one pre_commit_epoch's delta. + // - No other delta type will be present. + // - may_bump_max_sub_level_id will be invoked once later for this delta in apply_version_delta. + let l0_sub_level_id = new_version_delta.latest_version().max_sub_level_id + 1; let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) @@ -156,7 +161,7 @@ impl<'a> HummockVersionTransaction<'a> { .group_deltas; let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( 0, - 0, // l0_sub_level_id will be generated during apply_version_delta + l0_sub_level_id, vec![], // default inserted_table_infos, 0, // default diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 0ffdd15eca498..2180d0cd0a039 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -15,9 +15,11 @@ use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::iter; use std::sync::Arc; use bytes::Bytes; +use itertools::Either::{Left, Right}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; @@ -434,10 +436,10 @@ impl HummockVersion { continue; } match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) { - Ok(idx) => { + Left(idx) => { add_ssts_to_sub_level(target_l0, idx, insert_table_infos); } - Err(idx) => { + Right(idx) => { insert_new_sub_level( target_l0, sub_level.sub_level_id, @@ -660,15 +662,10 @@ impl HummockVersion { || group_destroy.is_some(), "no sst should be deleted when committing an epoch" ); - let mut next_l0_sub_level_id = levels - .l0 - .sub_levels - .last() - .map(|level| level.sub_level_id + 1) - .unwrap_or(1); for group_delta in &group_deltas.group_deltas { if let GroupDelta::IntraLevel(IntraLevelDelta { level_idx, + l0_sub_level_id, inserted_table_infos, .. }) = group_delta @@ -680,12 +677,11 @@ impl HummockVersion { if !inserted_table_infos.is_empty() { insert_new_sub_level( &mut levels.l0, - next_l0_sub_level_id, + *l0_sub_level_id, PbLevelType::Overlapping, inserted_table_infos.clone(), None, ); - next_l0_sub_level_id += 1; } } } @@ -764,6 +760,8 @@ impl HummockVersion { &version_delta.state_table_info_delta, &changed_table_info, ); + + self.may_bump_max_sub_level_id(); } pub fn apply_change_log_delta( @@ -883,7 +881,22 @@ impl HummockVersion { ) }); - group_split::merge_levels(left_levels, right_levels); + group_split::merge_levels(left_levels, right_levels, self.max_sub_level_id); + // There is no necessity to invoke may_bump_max_sub_level_id here, because + // - There's at most one GroupMerge delta in one delta. + // - No other delta type will be present. + // - may_bump_max_sub_level_id will be invoked once later for this delta in apply_version_delta. + } + + fn may_bump_max_sub_level_id(&mut self) { + // The max_sub_level_id may have been increased, recalculate it. + self.max_sub_level_id = self + .levels + .values() + .filter_map(|levels| levels.l0.sub_levels.iter().map(|s| s.sub_level_id).max()) + .chain(iter::once(self.max_sub_level_id)) + .max() + .unwrap(); } } @@ -952,10 +965,10 @@ impl HummockVersionCommon { l0.uncompressed_file_size -= sst_info.uncompressed_file_size; }); match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) { - Ok(idx) => { + Left(idx) => { add_ssts_to_sub_level(target_l0, idx, insert_table_infos); } - Err(idx) => { + Right(idx) => { insert_new_sub_level( target_l0, sub_level.sub_level_id, @@ -2032,7 +2045,7 @@ mod tests { let mut left_levels = Levels::default(); let right_levels = Levels::default(); - group_split::merge_levels(&mut left_levels, right_levels); + group_split::merge_levels(&mut left_levels, right_levels, 105); } { @@ -2046,7 +2059,7 @@ mod tests { ); let right_levels = right_levels.clone(); - group_split::merge_levels(&mut left_levels, right_levels); + group_split::merge_levels(&mut left_levels, right_levels, 105); assert!(left_levels.l0.sub_levels.len() == 3); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); @@ -2071,7 +2084,7 @@ mod tests { }, ); - group_split::merge_levels(&mut left_levels, right_levels); + group_split::merge_levels(&mut left_levels, right_levels, 105); assert!(left_levels.l0.sub_levels.len() == 3); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); @@ -2089,7 +2102,7 @@ mod tests { let mut left_levels = left_levels.clone(); let right_levels = right_levels.clone(); - group_split::merge_levels(&mut left_levels, right_levels); + group_split::merge_levels(&mut left_levels, right_levels, 105); assert!(left_levels.l0.sub_levels.len() == 6); assert!(left_levels.l0.sub_levels[0].sub_level_id == 101); @@ -2252,6 +2265,7 @@ mod tests { }, ), )]), + max_sub_level_id: 101, ..Default::default() }; @@ -2447,7 +2461,7 @@ mod tests { right_l0.sub_levels.push(Level { level_idx: 0, table_infos: x, - sub_level_id: 101, + sub_level_id: 100, total_file_size: 100, level_type: LevelType::Overlapping, ..Default::default() @@ -2459,7 +2473,7 @@ mod tests { ..Default::default() }; - merge_levels(cg1, right_levels); + merge_levels(cg1, right_levels, version.max_sub_level_id); } { diff --git a/src/storage/hummock_sdk/src/compaction_group/mod.rs b/src/storage/hummock_sdk/src/compaction_group/mod.rs index 0bf3fcea1c4e7..4b6637a957d80 100644 --- a/src/storage/hummock_sdk/src/compaction_group/mod.rs +++ b/src/storage/hummock_sdk/src/compaction_group/mod.rs @@ -49,6 +49,8 @@ pub mod group_split { use std::collections::BTreeSet; use bytes::Bytes; + use itertools::Either; + use itertools::Either::{Left, Right}; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_pb::hummock::PbLevelType; @@ -272,24 +274,31 @@ pub mod group_split { } /// Merge the right levels into the left levels. - pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels) { + pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels, max_sub_level_id: u64) { let right_l0 = right_levels.l0; - - let mut max_left_sub_level_id = left_levels + let mut next_right_sub_level_id = max_sub_level_id + 1; + let max_left_sub_level_id = left_levels .l0 .sub_levels .iter() - .map(|sub_level| sub_level.sub_level_id + 1) - .max() - .unwrap_or(0); // If there are no sub levels, the max sub level id is 0. - let need_rewrite_right_sub_level_id = max_left_sub_level_id != 0; + .map(|l| l.sub_level_id) + .max(); + assert!( + max_left_sub_level_id + .map(|left| left < next_right_sub_level_id) + .unwrap_or(true), + "max_left_sub_level_id={:?} next_right_sub_level_id={}", + max_left_sub_level_id, + next_right_sub_level_id + ); + let need_rewrite_right_sub_level_id = !left_levels.l0.sub_levels.is_empty(); for mut right_sub_level in right_l0.sub_levels { // Rewrtie the sub level id of right sub level to avoid conflict with left sub levels. (conflict level type) // e.g. left sub levels: [0, 1, 2], right sub levels: [0, 1, 2], after rewrite, right sub levels: [3, 4, 5] if need_rewrite_right_sub_level_id { - right_sub_level.sub_level_id = max_left_sub_level_id; - max_left_sub_level_id += 1; + right_sub_level.sub_level_id = next_right_sub_level_id; + next_right_sub_level_id += 1; } insert_new_sub_level( @@ -354,26 +363,26 @@ pub mod group_split { } } - // When `insert_hint` is `Ok(idx)`, it means that the sub level `idx` in `target_l0` - // will extend these SSTs. When `insert_hint` is `Err(idx)`, it + // When `insert_hint` is `Left(idx)`, it means that the sub level `idx` in `target_l0` + // will extend these SSTs. When `insert_hint` is `Right(idx)`, it // means that we will add a new sub level `idx` into `target_l0`. pub fn get_sub_level_insert_hint( target_levels: &Vec, sub_level: &Level, - ) -> Result { + ) -> Either { for (idx, other) in target_levels.iter().enumerate() { match other.sub_level_id.cmp(&sub_level.sub_level_id) { Ordering::Less => {} Ordering::Equal => { - return Ok(idx); + return Left(idx); } Ordering::Greater => { - return Err(idx); + return Right(idx); } } } - Err(target_levels.len()) + Right(target_levels.len()) } /// Split the SSTs in the level according to the split key. diff --git a/src/storage/hummock_sdk/src/frontend_version.rs b/src/storage/hummock_sdk/src/frontend_version.rs index 58276c522bd54..5db5789ceb755 100644 --- a/src/storage/hummock_sdk/src/frontend_version.rs +++ b/src/storage/hummock_sdk/src/frontend_version.rs @@ -89,6 +89,8 @@ impl FrontendHummockVersion { }) .collect(), state_table_info: self.state_table_info.to_protobuf(), + // max_sub_level_id is not expected to be used + max_sub_level_id: None, } } diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 11bb64ba5a4f7..ce7ccef9675d0 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -130,6 +130,7 @@ impl Sub for HummockVersionId { pub const INVALID_VERSION_ID: HummockVersionId = HummockVersionId(0); pub const FIRST_VERSION_ID: HummockVersionId = HummockVersionId(1); +pub const INVALID_MAX_SUB_LEVEL_ID: u64 = 0; pub const SPLIT_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 1u64 << 56; pub const SINGLE_TABLE_COMPACTION_GROUP_ID_HEAD: u64 = 2u64 << 56; pub const OBJECT_SUFFIX: &str = "data"; diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index 3a58a7daa760c..7413472ab010b 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -124,6 +124,7 @@ impl From<(&HummockVersion, &HashSet)> for IncompleteHummockV }) .collect(), state_table_info: version.state_table_info.clone(), + max_sub_level_id: version.max_sub_level_id, } } } diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index dbd927e3d724a..a9115c4c475ec 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -38,6 +38,7 @@ use crate::sstable_info::SstableInfo; use crate::table_watermark::TableWatermarks; use crate::{ CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, FIRST_VERSION_ID, + INVALID_MAX_SUB_LEVEL_ID, }; #[derive(Debug, Clone, PartialEq)] @@ -224,6 +225,8 @@ pub struct HummockVersionCommon { pub table_watermarks: HashMap>, pub table_change_log: HashMap>, pub state_table_info: HummockVersionStateTableInfo, + /// The maximum `sub_level_id` that has been recorded thus far, regardless of whether the `sub_level_id` currently exists. + pub max_sub_level_id: u64, } pub type HummockVersion = HummockVersionCommon; @@ -312,6 +315,11 @@ where state_table_info: HummockVersionStateTableInfo::from_protobuf( &pb_version.state_table_info, ), + // The next_sub_level_id is expected to be None in Pb from previous kernel version. + // For backward compatibility, see load_meta_store_state_impl. + max_sub_level_id: pb_version + .max_sub_level_id + .unwrap_or(INVALID_MAX_SUB_LEVEL_ID), } } } @@ -341,6 +349,7 @@ where .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf())) .collect(), state_table_info: version.state_table_info.to_protobuf(), + max_sub_level_id: Some(version.max_sub_level_id), } } } @@ -371,6 +380,7 @@ where .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf())) .collect(), state_table_info: version.state_table_info.to_protobuf(), + max_sub_level_id: Some(version.max_sub_level_id), } } } @@ -433,6 +443,7 @@ impl HummockVersion { table_watermarks: HashMap::new(), table_change_log: HashMap::new(), state_table_info: HummockVersionStateTableInfo::empty(), + max_sub_level_id: INVALID_MAX_SUB_LEVEL_ID, }; for group_id in [ StaticCompactionGroupId::StateDefault as CompactionGroupId,