From 4afcd4d92b613fdc01a27ad71091683bbdf31a97 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Thu, 12 Sep 2024 15:14:11 +0800 Subject: [PATCH] refactor(storage): make HummockVersion a generic struct (#18478) --- src/storage/hummock_sdk/src/change_log.rs | 111 ++++--- .../compaction_group/hummock_version_ext.rs | 182 ++++++------ src/storage/hummock_sdk/src/level.rs | 154 ++++++---- src/storage/hummock_sdk/src/time_travel.rs | 245 +++------------- src/storage/hummock_sdk/src/version.rs | 272 ++++++++++++------ 5 files changed, 488 insertions(+), 476 deletions(-) diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index 433309acab930..c231b0eb6b7b5 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -16,32 +16,42 @@ use std::collections::HashMap; use risingwave_common::catalog::TableId; use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta; -use risingwave_pb::hummock::{PbEpochNewChangeLog, PbTableChangeLog}; +use risingwave_pb::hummock::{PbEpochNewChangeLog, PbSstableInfo, PbTableChangeLog}; use tracing::warn; use crate::sstable_info::SstableInfo; #[derive(Debug, Clone, PartialEq)] -pub struct TableChangeLog(pub Vec); +pub struct TableChangeLogCommon(pub Vec>); + +pub type TableChangeLog = TableChangeLogCommon; #[derive(Debug, Clone, PartialEq)] -pub struct EpochNewChangeLog { - pub new_value: Vec, - pub old_value: Vec, +pub struct EpochNewChangeLogCommon { + pub new_value: Vec, + pub old_value: Vec, pub epochs: Vec, } -impl From<&EpochNewChangeLog> for PbEpochNewChangeLog { - fn from(val: &EpochNewChangeLog) -> Self { +pub type EpochNewChangeLog = EpochNewChangeLogCommon; + +impl From<&EpochNewChangeLogCommon> for PbEpochNewChangeLog +where + PbSstableInfo: for<'a> From<&'a T>, +{ + fn from(val: &EpochNewChangeLogCommon) -> Self { Self { - new_value: val.new_value.iter().map(|a| a.clone().into()).collect(), - old_value: val.old_value.iter().map(|a| a.clone().into()).collect(), + new_value: val.new_value.iter().map(|a| a.into()).collect(), + old_value: val.old_value.iter().map(|a| a.into()).collect(), epochs: val.epochs.clone(), } } } -impl From<&PbEpochNewChangeLog> for EpochNewChangeLog { +impl From<&PbEpochNewChangeLog> for EpochNewChangeLogCommon +where + T: for<'a> From<&'a PbSstableInfo>, +{ fn from(value: &PbEpochNewChangeLog) -> Self { Self { new_value: value.new_value.iter().map(|a| a.into()).collect(), @@ -51,30 +61,28 @@ impl From<&PbEpochNewChangeLog> for EpochNewChangeLog { } } -impl From for PbEpochNewChangeLog { - fn from(val: EpochNewChangeLog) -> Self { +impl From> for PbEpochNewChangeLog +where + PbSstableInfo: From, +{ + fn from(val: EpochNewChangeLogCommon) -> Self { Self { - new_value: val - .new_value - .into_iter() - .map(|a| a.clone().into()) - .collect(), - old_value: val - .old_value - .into_iter() - .map(|a| a.clone().into()) - .collect(), - epochs: val.epochs.clone(), + new_value: val.new_value.into_iter().map(|a| a.into()).collect(), + old_value: val.old_value.into_iter().map(|a| a.into()).collect(), + epochs: val.epochs, } } } -impl From for EpochNewChangeLog { +impl From for EpochNewChangeLogCommon +where + T: From, +{ fn from(value: PbEpochNewChangeLog) -> Self { Self { new_value: value.new_value.into_iter().map(|a| a.into()).collect(), old_value: value.old_value.into_iter().map(|a| a.into()).collect(), - epochs: value.epochs.clone(), + epochs: value.epochs, } } } @@ -117,15 +125,23 @@ impl TableChangeLog { } } -impl TableChangeLog { +impl TableChangeLogCommon +where + PbSstableInfo: for<'a> From<&'a T>, +{ pub fn to_protobuf(&self) -> PbTableChangeLog { PbTableChangeLog { change_logs: self.0.iter().map(|a| a.into()).collect(), } } +} +impl TableChangeLogCommon +where + T: for<'a> From<&'a PbSstableInfo>, +{ pub fn from_protobuf(val: &PbTableChangeLog) -> Self { - Self(val.change_logs.clone().iter().map(|a| a.into()).collect()) + Self(val.change_logs.iter().map(|a| a.into()).collect()) } } @@ -173,13 +189,18 @@ pub fn build_table_change_log_delta<'a>( } #[derive(Debug, PartialEq, Clone)] -pub struct ChangeLogDelta { +pub struct ChangeLogDeltaCommon { pub truncate_epoch: u64, - pub new_log: Option, + pub new_log: Option>, } -impl From<&ChangeLogDelta> for PbChangeLogDelta { - fn from(val: &ChangeLogDelta) -> Self { +pub type ChangeLogDelta = ChangeLogDeltaCommon; + +impl From<&ChangeLogDeltaCommon> for PbChangeLogDelta +where + PbSstableInfo: for<'a> From<&'a T>, +{ + fn from(val: &ChangeLogDeltaCommon) -> Self { Self { truncate_epoch: val.truncate_epoch, new_log: val.new_log.as_ref().map(|a| a.into()), @@ -187,7 +208,10 @@ impl From<&ChangeLogDelta> for PbChangeLogDelta { } } -impl From<&PbChangeLogDelta> for ChangeLogDelta { +impl From<&PbChangeLogDelta> for ChangeLogDeltaCommon +where + T: for<'a> From<&'a PbSstableInfo>, +{ fn from(val: &PbChangeLogDelta) -> Self { Self { truncate_epoch: val.truncate_epoch, @@ -196,8 +220,11 @@ impl From<&PbChangeLogDelta> for ChangeLogDelta { } } -impl From for PbChangeLogDelta { - fn from(val: ChangeLogDelta) -> Self { +impl From> for PbChangeLogDelta +where + PbSstableInfo: From, +{ + fn from(val: ChangeLogDeltaCommon) -> Self { Self { truncate_epoch: val.truncate_epoch, new_log: val.new_log.map(|a| a.into()), @@ -205,7 +232,10 @@ impl From for PbChangeLogDelta { } } -impl From for ChangeLogDelta { +impl From for ChangeLogDeltaCommon +where + T: From, +{ fn from(val: PbChangeLogDelta) -> Self { Self { truncate_epoch: val.truncate_epoch, @@ -218,11 +248,12 @@ impl From for ChangeLogDelta { mod tests { use itertools::Itertools; - use crate::change_log::{EpochNewChangeLog, TableChangeLog}; + use crate::change_log::{EpochNewChangeLog, TableChangeLogCommon}; + use crate::sstable_info::SstableInfo; #[test] fn test_filter_epoch() { - let table_change_log = TableChangeLog(vec![ + let table_change_log = TableChangeLogCommon::(vec![ EpochNewChangeLog { new_value: vec![], old_value: vec![], @@ -262,7 +293,7 @@ mod tests { #[test] fn test_truncate() { - let mut table_change_log = TableChangeLog(vec![ + let mut table_change_log = TableChangeLogCommon::(vec![ EpochNewChangeLog { new_value: vec![], old_value: vec![], @@ -288,7 +319,7 @@ mod tests { table_change_log.truncate(1); assert_eq!( table_change_log, - TableChangeLog(vec![ + TableChangeLogCommon::(vec![ EpochNewChangeLog { new_value: vec![], old_value: vec![], @@ -310,7 +341,7 @@ mod tests { table_change_log.truncate(3); assert_eq!( table_change_log, - TableChangeLog(vec![ + TableChangeLogCommon::(vec![ EpochNewChangeLog { new_value: vec![], old_value: vec![], diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 682cb107f3395..376626e844242 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -29,7 +29,7 @@ use tracing::warn; use super::group_split::get_sub_level_insert_hint; use super::{group_split, StateTableId}; -use crate::change_log::TableChangeLog; +use crate::change_log::TableChangeLogCommon; use crate::compaction_group::StaticCompactionGroupId; use crate::key_range::KeyRangeCommon; use crate::level::{Level, Levels, OverlappingLevel}; @@ -795,7 +795,7 @@ impl HummockVersion { change_log.0.push(new_change_log.clone()); } Entry::Vacant(entry) => { - entry.insert(TableChangeLog(vec![new_change_log.clone()])); + entry.insert(TableChangeLogCommon(vec![new_change_log.clone()])); } }; } @@ -1458,20 +1458,22 @@ mod tests { #[test] fn test_get_sst_object_ids() { - let mut version = HummockVersion::default(); - version.id = HummockVersionId::new(0); - version.levels = HashMap::from_iter([( - 0, - Levels { - levels: vec![], - l0: OverlappingLevel { - sub_levels: vec![], - total_file_size: 0, - uncompressed_file_size: 0, + let mut version = HummockVersion { + id: HummockVersionId::new(0), + levels: HashMap::from_iter([( + 0, + Levels { + levels: vec![], + l0: OverlappingLevel { + sub_levels: vec![], + total_file_size: 0, + uncompressed_file_size: 0, + }, + ..Default::default() }, - ..Default::default() - }, - )]); + )]), + ..Default::default() + }; assert_eq!(version.get_object_ids().len(), 0); // Add to sub level @@ -1505,68 +1507,72 @@ mod tests { #[test] fn test_apply_version_delta() { - let mut version = HummockVersion::default(); - version.id = HummockVersionId::new(0); - version.levels = HashMap::from_iter([ - ( - 0, - build_initial_compaction_group_levels( + let mut version = HummockVersion { + id: HummockVersionId::new(0), + levels: HashMap::from_iter([ + ( 0, - &CompactionConfig { - max_level: 6, - ..Default::default() - }, + build_initial_compaction_group_levels( + 0, + &CompactionConfig { + max_level: 6, + ..Default::default() + }, + ), ), - ), - ( - 1, - build_initial_compaction_group_levels( + ( 1, - &CompactionConfig { - max_level: 6, - ..Default::default() - }, - ), - ), - ]); - let mut version_delta = HummockVersionDelta::default(); - version_delta.id = HummockVersionId::new(1); - version_delta.group_deltas = HashMap::from_iter([ - ( - 2, - GroupDeltas { - group_deltas: vec![GroupDelta::GroupConstruct(GroupConstruct { - group_config: Some(CompactionConfig { + build_initial_compaction_group_levels( + 1, + &CompactionConfig { max_level: 6, ..Default::default() - }), - ..Default::default() - })], - }, - ), - ( - 0, - GroupDeltas { - group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})], - }, - ), - ( - 1, - GroupDeltas { - group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new( - 1, - 0, - vec![], - vec![SstableInfo { - object_id: 1, - sst_id: 1, + }, + ), + ), + ]), + ..Default::default() + }; + let version_delta = HummockVersionDelta { + id: HummockVersionId::new(1), + group_deltas: HashMap::from_iter([ + ( + 2, + GroupDeltas { + group_deltas: vec![GroupDelta::GroupConstruct(GroupConstruct { + group_config: Some(CompactionConfig { + max_level: 6, + ..Default::default() + }), ..Default::default() - }], - 0, - ))], - }, - ), - ]); + })], + }, + ), + ( + 0, + GroupDeltas { + group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})], + }, + ), + ( + 1, + GroupDeltas { + group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new( + 1, + 0, + vec![], + vec![SstableInfo { + object_id: 1, + sst_id: 1, + ..Default::default() + }], + 0, + ))], + }, + ), + ]), + ..Default::default() + }; let version_delta = version_delta; version.apply_version_delta(&version_delta); @@ -1587,24 +1593,26 @@ mod tests { }], ..Default::default() }; - assert_eq!(version, { - let mut version = HummockVersion::default(); - version.id = HummockVersionId::new(1); - version.levels = HashMap::from_iter([ - ( - 2, - build_initial_compaction_group_levels( + assert_eq!( + version, + HummockVersion { + id: HummockVersionId::new(1), + levels: HashMap::from_iter([ + ( 2, - &CompactionConfig { - max_level: 6, - ..Default::default() - }, + build_initial_compaction_group_levels( + 2, + &CompactionConfig { + max_level: 6, + ..Default::default() + }, + ), ), - ), - (1, cg1), - ]); - version - }); + (1, cg1), + ]), + ..Default::default() + } + ); } fn gen_sst_info(object_id: u64, table_ids: Vec, left: Bytes, right: Bytes) -> SstableInfo { diff --git a/src/storage/hummock_sdk/src/level.rs b/src/storage/hummock_sdk/src/level.rs index c7db09e69e76d..762b5abd25ac9 100644 --- a/src/storage/hummock_sdk/src/level.rs +++ b/src/storage/hummock_sdk/src/level.rs @@ -23,19 +23,24 @@ use risingwave_pb::hummock::{ use crate::sstable_info::SstableInfo; #[derive(Debug, Clone, PartialEq, Default)] -pub struct OverlappingLevel { - pub sub_levels: Vec, +pub struct OverlappingLevelCommon { + pub sub_levels: Vec>, pub total_file_size: u64, pub uncompressed_file_size: u64, } -impl From<&PbOverlappingLevel> for OverlappingLevel { +pub type OverlappingLevel = OverlappingLevelCommon; + +impl From<&PbOverlappingLevel> for OverlappingLevelCommon +where + for<'a> LevelCommon: From<&'a PbLevel>, +{ fn from(pb_overlapping_level: &PbOverlappingLevel) -> Self { Self { sub_levels: pb_overlapping_level .sub_levels .iter() - .map(Level::from) + .map(LevelCommon::from) .collect_vec(), total_file_size: pb_overlapping_level.total_file_size, uncompressed_file_size: pb_overlapping_level.uncompressed_file_size, @@ -43,13 +48,16 @@ impl From<&PbOverlappingLevel> for OverlappingLevel { } } -impl From<&OverlappingLevel> for PbOverlappingLevel { - fn from(overlapping_level: &OverlappingLevel) -> Self { +impl From<&OverlappingLevelCommon> for PbOverlappingLevel +where + for<'a> &'a LevelCommon: Into, +{ + fn from(overlapping_level: &OverlappingLevelCommon) -> Self { Self { sub_levels: overlapping_level .sub_levels .iter() - .map(|pb_level| pb_level.into()) + .map(|level| level.into()) .collect_vec(), total_file_size: overlapping_level.total_file_size, uncompressed_file_size: overlapping_level.uncompressed_file_size, @@ -57,8 +65,11 @@ impl From<&OverlappingLevel> for PbOverlappingLevel { } } -impl From for PbOverlappingLevel { - fn from(overlapping_level: OverlappingLevel) -> Self { +impl From> for PbOverlappingLevel +where + LevelCommon: Into, +{ + fn from(overlapping_level: OverlappingLevelCommon) -> Self { Self { sub_levels: overlapping_level .sub_levels @@ -71,13 +82,16 @@ impl From for PbOverlappingLevel { } } -impl From for OverlappingLevel { +impl From for OverlappingLevelCommon +where + LevelCommon: From, +{ fn from(pb_overlapping_level: PbOverlappingLevel) -> Self { Self { sub_levels: pb_overlapping_level .sub_levels .into_iter() - .map(Level::from) + .map(LevelCommon::from) .collect_vec(), total_file_size: pb_overlapping_level.total_file_size, uncompressed_file_size: pb_overlapping_level.uncompressed_file_size, @@ -97,26 +111,27 @@ impl OverlappingLevel { } #[derive(Debug, Clone, PartialEq, Default)] -pub struct Level { +pub struct LevelCommon { pub level_idx: u32, pub level_type: PbLevelType, - pub table_infos: Vec, + pub table_infos: Vec, pub total_file_size: u64, pub sub_level_id: u64, pub uncompressed_file_size: u64, pub vnode_partition_count: u32, } -impl From<&PbLevel> for Level { +pub type Level = LevelCommon; + +impl From<&PbLevel> for LevelCommon +where + T: for<'a> From<&'a PbSstableInfo>, +{ fn from(pb_level: &PbLevel) -> Self { Self { level_idx: pb_level.level_idx, level_type: PbLevelType::try_from(pb_level.level_type).unwrap(), - table_infos: pb_level - .table_infos - .iter() - .map(SstableInfo::from) - .collect_vec(), + table_infos: pb_level.table_infos.iter().map(Into::into).collect_vec(), total_file_size: pb_level.total_file_size, sub_level_id: pb_level.sub_level_id, uncompressed_file_size: pb_level.uncompressed_file_size, @@ -125,16 +140,15 @@ impl From<&PbLevel> for Level { } } -impl From<&Level> for PbLevel { - fn from(level: &Level) -> Self { +impl From<&LevelCommon> for PbLevel +where + PbSstableInfo: for<'a> From<&'a T>, +{ + fn from(level: &LevelCommon) -> Self { Self { level_idx: level.level_idx, level_type: level.level_type.into(), - table_infos: level - .table_infos - .iter() - .map(PbSstableInfo::from) - .collect_vec(), + table_infos: level.table_infos.iter().map(Into::into).collect_vec(), total_file_size: level.total_file_size, sub_level_id: level.sub_level_id, uncompressed_file_size: level.uncompressed_file_size, @@ -143,16 +157,15 @@ impl From<&Level> for PbLevel { } } -impl From for PbLevel { - fn from(level: Level) -> Self { +impl From> for PbLevel +where + PbSstableInfo: From, +{ + fn from(level: LevelCommon) -> Self { Self { level_idx: level.level_idx, level_type: level.level_type.into(), - table_infos: level - .table_infos - .into_iter() - .map(PbSstableInfo::from) - .collect_vec(), + table_infos: level.table_infos.into_iter().map(Into::into).collect_vec(), total_file_size: level.total_file_size, sub_level_id: level.sub_level_id, uncompressed_file_size: level.uncompressed_file_size, @@ -161,7 +174,10 @@ impl From for PbLevel { } } -impl From for Level { +impl From for LevelCommon +where + T: From, +{ fn from(pb_level: PbLevel) -> Self { Self { level_idx: pb_level.level_idx, @@ -169,7 +185,7 @@ impl From for Level { table_infos: pb_level .table_infos .into_iter() - .map(SstableInfo::from) + .map(Into::into) .collect_vec(), total_file_size: pb_level.total_file_size, sub_level_id: pb_level.sub_level_id, @@ -196,9 +212,9 @@ impl Level { } #[derive(Debug, Clone, PartialEq, Default)] -pub struct Levels { - pub levels: Vec, - pub l0: OverlappingLevel, +pub struct LevelsCommon { + pub levels: Vec>, + pub l0: OverlappingLevelCommon, pub group_id: u64, pub parent_group_id: u64, @@ -206,6 +222,8 @@ pub struct Levels { pub member_table_ids: Vec, } +pub type Levels = LevelsCommon; + impl Levels { pub fn level0(&self) -> &OverlappingLevel { &self.l0 @@ -236,15 +254,25 @@ impl Levels { } } -impl Levels { - pub fn from_protobuf(pb_levels: &PbLevels) -> Self { - Self::from(pb_levels) - } - +impl LevelsCommon +where + PbLevels: for<'a> From<&'a LevelsCommon>, +{ pub fn to_protobuf(&self) -> PbLevels { self.into() } +} + +impl LevelsCommon +where + LevelsCommon: for<'a> From<&'a PbLevels>, +{ + pub fn from_protobuf(pb_levels: &PbLevels) -> LevelsCommon { + LevelsCommon::::from(pb_levels) + } +} +impl Levels { pub fn estimated_encode_len(&self) -> usize { let mut basic = self .levels @@ -260,12 +288,15 @@ impl Levels { } } -impl From<&PbLevels> for Levels { +impl From<&PbLevels> for LevelsCommon +where + T: for<'a> From<&'a PbSstableInfo>, +{ #[expect(deprecated)] fn from(pb_levels: &PbLevels) -> Self { Self { - l0: OverlappingLevel::from(pb_levels.l0.as_ref().unwrap()), - levels: pb_levels.levels.iter().map(Level::from).collect_vec(), + l0: OverlappingLevelCommon::from(pb_levels.l0.as_ref().unwrap()), + levels: pb_levels.levels.iter().map(Into::into).collect_vec(), group_id: pb_levels.group_id, parent_group_id: pb_levels.parent_group_id, member_table_ids: pb_levels.member_table_ids.clone(), @@ -273,9 +304,12 @@ impl From<&PbLevels> for Levels { } } -impl From<&Levels> for PbLevels { +impl From<&LevelsCommon> for PbLevels +where + PbSstableInfo: for<'a> From<&'a T>, +{ #[expect(deprecated)] - fn from(levels: &Levels) -> Self { + fn from(levels: &LevelsCommon) -> Self { Self { l0: Some((&levels.l0).into()), levels: levels.levels.iter().map(PbLevel::from).collect_vec(), @@ -286,28 +320,38 @@ impl From<&Levels> for PbLevels { } } -impl From for Levels { +impl From for LevelsCommon +where + T: From, +{ #[expect(deprecated)] fn from(pb_levels: PbLevels) -> Self { Self { - l0: OverlappingLevel::from(pb_levels.l0.as_ref().unwrap()), - levels: pb_levels.levels.into_iter().map(Level::from).collect_vec(), + l0: OverlappingLevelCommon::from(pb_levels.l0.unwrap()), + levels: pb_levels + .levels + .into_iter() + .map(LevelCommon::from) + .collect_vec(), group_id: pb_levels.group_id, parent_group_id: pb_levels.parent_group_id, - member_table_ids: pb_levels.member_table_ids.clone(), + member_table_ids: pb_levels.member_table_ids, } } } -impl From for PbLevels { - fn from(levels: Levels) -> Self { +impl From> for PbLevels +where + PbSstableInfo: From, +{ + fn from(levels: LevelsCommon) -> Self { #[expect(deprecated)] Self { l0: Some(levels.l0.into()), levels: levels.levels.into_iter().map(PbLevel::from).collect_vec(), group_id: levels.group_id, parent_group_id: levels.parent_group_id, - member_table_ids: levels.member_table_ids.clone(), + member_table_ids: levels.member_table_ids, } } } diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index 380d75340df27..e828c94a4d781 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -13,87 +13,20 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; -use std::sync::Arc; -use risingwave_common::catalog::TableId; -use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas; -use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta, PbStateTableInfoDelta}; +use risingwave_pb::hummock::hummock_version::PbLevels; +use risingwave_pb::hummock::hummock_version_delta::{PbChangeLogDelta, PbGroupDeltas}; +use risingwave_pb::hummock::{PbEpochNewChangeLog, PbSstableInfo}; -use crate::change_log::{ChangeLogDelta, EpochNewChangeLog, TableChangeLog}; -use crate::level::{Level, Levels, OverlappingLevel}; +use crate::change_log::{TableChangeLog, TableChangeLogCommon}; +use crate::level::Level; use crate::sstable_info::SstableInfo; -use crate::table_watermark::TableWatermarks; use crate::version::{ - GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo, - IntraLevelDelta, + HummockVersion, HummockVersionCommon, HummockVersionDelta, HummockVersionDeltaCommon, }; -use crate::{CompactionGroupId, HummockSstableId, HummockVersionId}; +use crate::{CompactionGroupId, HummockSstableId}; -/// [`IncompleteHummockVersion`] is incomplete because `SSTableInfo` only has the `sst_id` set in the following fields: -/// - `PbLevels` -/// - `TableChangeLog` -#[derive(Debug, Clone, PartialEq)] -pub struct IncompleteHummockVersion { - pub id: HummockVersionId, - pub levels: HashMap, - max_committed_epoch: u64, - safe_epoch: u64, - pub table_watermarks: HashMap>, - pub table_change_log: HashMap, - pub state_table_info: HummockVersionStateTableInfo, -} - -/// Clone from an `SstableInfo`, but only set the `sst_id` for the target, leaving other fields as default. -/// The goal is to reduce the size of pb object generated afterward. -fn stripped_sstable_info(origin: &SstableInfo) -> SstableInfo { - SstableInfo { - object_id: Default::default(), - sst_id: origin.sst_id, - key_range: Default::default(), - file_size: Default::default(), - table_ids: Default::default(), - meta_offset: Default::default(), - stale_key_count: Default::default(), - total_key_count: Default::default(), - min_epoch: Default::default(), - max_epoch: Default::default(), - uncompressed_file_size: Default::default(), - range_tombstone_count: Default::default(), - bloom_filter_kind: Default::default(), - sst_size: Default::default(), - } -} - -fn stripped_epoch_new_change_log(origin: &EpochNewChangeLog) -> EpochNewChangeLog { - EpochNewChangeLog { - old_value: origin.old_value.iter().map(stripped_sstable_info).collect(), - new_value: origin.new_value.iter().map(stripped_sstable_info).collect(), - epochs: origin.epochs.clone(), - } -} - -fn stripped_change_log_delta(origin: &ChangeLogDelta) -> ChangeLogDelta { - ChangeLogDelta { - new_log: origin.new_log.as_ref().map(stripped_epoch_new_change_log), - truncate_epoch: origin.truncate_epoch, - } -} - -fn stripped_level(origin: &Level) -> Level { - Level { - level_idx: origin.level_idx, - level_type: origin.level_type, - table_infos: origin - .table_infos - .iter() - .map(stripped_sstable_info) - .collect(), - total_file_size: origin.total_file_size, - sub_level_id: origin.sub_level_id, - uncompressed_file_size: origin.uncompressed_file_size, - vnode_partition_count: origin.vnode_partition_count, - } -} +pub type IncompleteHummockVersion = HummockVersionCommon; pub fn refill_version( version: &mut HummockVersion, @@ -146,55 +79,6 @@ fn refill_sstable_info( .clone(); } -fn stripped_l0(origin: &OverlappingLevel) -> OverlappingLevel { - OverlappingLevel { - sub_levels: origin.sub_levels.iter().map(stripped_level).collect(), - total_file_size: origin.total_file_size, - uncompressed_file_size: origin.uncompressed_file_size, - } -} - -#[allow(deprecated)] -fn stripped_levels(origin: &Levels) -> Levels { - Levels { - levels: origin.levels.iter().map(stripped_level).collect(), - l0: stripped_l0(&origin.l0), - group_id: origin.group_id, - parent_group_id: origin.parent_group_id, - member_table_ids: Default::default(), - } -} - -fn stripped_intra_level_delta(origin: &IntraLevelDelta) -> IntraLevelDelta { - IntraLevelDelta { - level_idx: origin.level_idx, - l0_sub_level_id: origin.l0_sub_level_id, - removed_table_ids: origin.removed_table_ids.clone(), - inserted_table_infos: origin - .inserted_table_infos - .iter() - .map(stripped_sstable_info) - .collect(), - vnode_partition_count: origin.vnode_partition_count, - } -} - -fn stripped_group_delta(origin: &GroupDelta) -> GroupDelta { - match origin { - GroupDelta::IntraLevel(l) => GroupDelta::IntraLevel(stripped_intra_level_delta(l)), - _ => panic!("time travel expects DeltaType::IntraLevel only"), - } -} - -fn stripped_group_deltas(origin: &GroupDeltas) -> GroupDeltas { - let group_deltas = origin - .group_deltas - .iter() - .map(stripped_group_delta) - .collect(); - GroupDeltas { group_deltas } -} - /// `SStableInfo` will be stripped. impl From<(&HummockVersion, &HashSet)> for IncompleteHummockVersion { fn from(p: (&HummockVersion, &HashSet)) -> Self { @@ -206,7 +90,10 @@ impl From<(&HummockVersion, &HashSet)> for IncompleteHummockV .iter() .filter_map(|(group_id, levels)| { if select_group.contains(group_id) { - Some((*group_id as CompactionGroupId, stripped_levels(levels))) + Some(( + *group_id as CompactionGroupId, + PbLevels::from(levels).into(), + )) } else { None } @@ -215,7 +102,7 @@ impl From<(&HummockVersion, &HashSet)> for IncompleteHummockV max_committed_epoch: version.visible_table_committed_epoch(), safe_epoch: version.visible_table_safe_epoch(), table_watermarks: version.table_watermarks.clone(), - // TODO: optimization: strip table change log + // TODO: optimization: strip table change log based on select_group table_change_log: version .table_change_log .iter() @@ -223,9 +110,9 @@ impl From<(&HummockVersion, &HashSet)> for IncompleteHummockV let incomplete_table_change_log = change_log .0 .iter() - .map(stripped_epoch_new_change_log) + .map(|e| PbEpochNewChangeLog::from(e).into()) .collect(); - (*table_id, TableChangeLog(incomplete_table_change_log)) + (*table_id, TableChangeLogCommon(incomplete_table_change_log)) }) .collect(), state_table_info: version.state_table_info.clone(), @@ -233,49 +120,10 @@ impl From<(&HummockVersion, &HashSet)> for IncompleteHummockV } } -impl IncompleteHummockVersion { - /// Resulted `SStableInfo` is incompelte. - pub fn to_protobuf(&self) -> PbHummockVersion { - PbHummockVersion { - id: self.id.0, - levels: self - .levels - .iter() - .map(|(group_id, levels)| (*group_id as _, levels.to_protobuf())) - .collect(), - max_committed_epoch: self.max_committed_epoch, - safe_epoch: self.safe_epoch, - table_watermarks: self - .table_watermarks - .iter() - .map(|(table_id, watermark)| (table_id.table_id, watermark.to_protobuf())) - .collect(), - table_change_logs: self - .table_change_log - .iter() - .map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf())) - .collect(), - state_table_info: self.state_table_info.to_protobuf(), - } - } -} - /// [`IncompleteHummockVersionDelta`] is incomplete because `SSTableInfo` only has the `sst_id` set in the following fields: /// - `PbGroupDeltas` /// - `ChangeLogDelta` -#[derive(Debug, PartialEq, Clone)] -pub struct IncompleteHummockVersionDelta { - pub id: HummockVersionId, - pub prev_id: HummockVersionId, - pub group_deltas: HashMap, - pub max_committed_epoch: u64, - pub safe_epoch: u64, - pub trivial_move: bool, - pub new_table_watermarks: HashMap, - pub removed_table_ids: HashSet, - pub change_log_delta: HashMap, - pub state_table_info_delta: HashMap, -} +pub type IncompleteHummockVersionDelta = HummockVersionDeltaCommon; /// `SStableInfo` will be stripped. impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHummockVersionDelta { @@ -289,7 +137,7 @@ impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHum .iter() .filter_map(|(cg_id, deltas)| { if select_group.contains(cg_id) { - Some((*cg_id, stripped_group_deltas(deltas).to_protobuf())) + Some((*cg_id, PbGroupDeltas::from(deltas).into())) } else { None } @@ -300,47 +148,42 @@ impl From<(&HummockVersionDelta, &HashSet)> for IncompleteHum trivial_move: delta.trivial_move, new_table_watermarks: delta.new_table_watermarks.clone(), removed_table_ids: delta.removed_table_ids.clone(), - // TODO: optimization: strip table change log + // TODO: optimization: strip table change log based on select_group change_log_delta: delta .change_log_delta .iter() - .map(|(table_id, log_delta)| (*table_id, stripped_change_log_delta(log_delta))) + .map(|(table_id, log_delta)| (*table_id, PbChangeLogDelta::from(log_delta).into())) .collect(), state_table_info_delta: delta.state_table_info_delta.clone(), } } } -impl IncompleteHummockVersionDelta { - /// Resulted `SStableInfo` is incompelte. - pub fn to_protobuf(&self) -> PbHummockVersionDelta { - PbHummockVersionDelta { - id: self.id.0, - prev_id: self.prev_id.0, - group_deltas: self.group_deltas.clone(), - max_committed_epoch: self.max_committed_epoch, - safe_epoch: self.safe_epoch, - trivial_move: self.trivial_move, - new_table_watermarks: self - .new_table_watermarks - .iter() - .map(|(table_id, watermarks)| (table_id.table_id, watermarks.to_protobuf())) - .collect(), - removed_table_ids: self - .removed_table_ids - .iter() - .map(|table_id| table_id.table_id) - .collect(), - change_log_delta: self - .change_log_delta - .iter() - .map(|(table_id, log_delta)| (table_id.table_id, log_delta.into())) - .collect(), - state_table_info_delta: self - .state_table_info_delta - .iter() - .map(|(table_id, delta)| (table_id.table_id, *delta)) - .collect(), +pub struct SstableIdInVersion(HummockSstableId); + +impl From<&SstableIdInVersion> for PbSstableInfo { + fn from(sst_id: &SstableIdInVersion) -> Self { + Self { + sst_id: sst_id.0, + ..Default::default() } } } + +impl From for PbSstableInfo { + fn from(sst_id: SstableIdInVersion) -> Self { + (&sst_id).into() + } +} + +impl From<&PbSstableInfo> for SstableIdInVersion { + fn from(value: &PbSstableInfo) -> Self { + SstableIdInVersion(value.sst_id) + } +} + +impl From for SstableIdInVersion { + fn from(value: PbSstableInfo) -> Self { + (&value).into() + } +} diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 1c8cfd1e310b4..4aecfcde0cf48 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -26,14 +26,14 @@ use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas; use risingwave_pb::hummock::{ CompactionConfig, PbGroupConstruct, PbGroupDelta, PbGroupDestroy, PbGroupMerge, PbGroupMetaChange, PbGroupTableChange, PbHummockVersion, PbHummockVersionDelta, - PbIntraLevelDelta, PbStateTableInfo, StateTableInfo, StateTableInfoDelta, + PbIntraLevelDelta, PbSstableInfo, PbStateTableInfo, StateTableInfo, StateTableInfoDelta, }; use tracing::warn; -use crate::change_log::{ChangeLogDelta, TableChangeLog}; +use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon}; use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels; use crate::compaction_group::StaticCompactionGroupId; -use crate::level::Levels; +use crate::level::LevelsCommon; use crate::sstable_info::SstableInfo; use crate::table_watermark::TableWatermarks; use crate::{CompactionGroupId, HummockSstableObjectId, HummockVersionId, FIRST_VERSION_ID}; @@ -209,33 +209,39 @@ impl HummockVersionStateTableInfo { } #[derive(Debug, Clone, PartialEq)] -pub struct HummockVersion { +pub struct HummockVersionCommon { pub id: HummockVersionId, - pub levels: HashMap, - max_committed_epoch: u64, - safe_epoch: u64, + pub levels: HashMap>, + pub(crate) max_committed_epoch: u64, + pub(crate) safe_epoch: u64, pub table_watermarks: HashMap>, - pub table_change_log: HashMap, + pub table_change_log: HashMap>, pub state_table_info: HummockVersionStateTableInfo, } +pub type HummockVersion = HummockVersionCommon; + impl Default for HummockVersion { fn default() -> Self { HummockVersion::from(&PbHummockVersion::default()) } } -impl HummockVersion { +impl HummockVersionCommon +where + T: for<'a> From<&'a PbSstableInfo>, + PbSstableInfo: for<'a> From<&'a T>, +{ /// Convert the `PbHummockVersion` received from rpc to `HummockVersion`. No need to /// maintain backward compatibility. pub fn from_rpc_protobuf(pb_version: &PbHummockVersion) -> Self { - HummockVersion::from(pb_version) + pb_version.into() } /// Convert the `PbHummockVersion` deserialized from persisted state to `HummockVersion`. /// We should maintain backward compatibility. pub fn from_persisted_protobuf(pb_version: &PbHummockVersion) -> Self { - HummockVersion::from(pb_version) + pb_version.into() } pub fn to_protobuf(&self) -> PbHummockVersion { @@ -260,14 +266,19 @@ impl HummockVersion { } } -impl From<&PbHummockVersion> for HummockVersion { +impl From<&PbHummockVersion> for HummockVersionCommon +where + T: for<'a> From<&'a PbSstableInfo>, +{ fn from(pb_version: &PbHummockVersion) -> Self { Self { id: HummockVersionId(pb_version.id), levels: pb_version .levels .iter() - .map(|(group_id, levels)| (*group_id as CompactionGroupId, Levels::from(levels))) + .map(|(group_id, levels)| { + (*group_id as CompactionGroupId, LevelsCommon::from(levels)) + }) .collect(), max_committed_epoch: pb_version.max_committed_epoch, safe_epoch: pb_version.safe_epoch, @@ -287,7 +298,7 @@ impl From<&PbHummockVersion> for HummockVersion { .map(|(table_id, change_log)| { ( TableId::new(*table_id), - TableChangeLog::from_protobuf(change_log), + TableChangeLogCommon::from_protobuf(change_log), ) }) .collect(), @@ -298,8 +309,11 @@ impl From<&PbHummockVersion> for HummockVersion { } } -impl From<&HummockVersion> for PbHummockVersion { - fn from(version: &HummockVersion) -> Self { +impl From<&HummockVersionCommon> for PbHummockVersion +where + PbSstableInfo: for<'a> From<&'a T>, +{ + fn from(version: &HummockVersionCommon) -> Self { Self { id: version.id.0, levels: version @@ -324,8 +338,12 @@ impl From<&HummockVersion> for PbHummockVersion { } } -impl From for PbHummockVersion { - fn from(version: HummockVersion) -> Self { +impl From> for PbHummockVersion +where + PbSstableInfo: From, + PbSstableInfo: for<'a> From<&'a T>, +{ + fn from(version: HummockVersionCommon) -> Self { Self { id: version.id.0, levels: version @@ -453,36 +471,42 @@ impl HummockVersion { } #[derive(Debug, PartialEq, Clone)] -pub struct HummockVersionDelta { +pub struct HummockVersionDeltaCommon { pub id: HummockVersionId, pub prev_id: HummockVersionId, - pub group_deltas: HashMap, - max_committed_epoch: u64, - safe_epoch: u64, + pub group_deltas: HashMap>, + pub(crate) max_committed_epoch: u64, + pub(crate) safe_epoch: u64, pub trivial_move: bool, pub new_table_watermarks: HashMap, pub removed_table_ids: HashSet, - pub change_log_delta: HashMap, + pub change_log_delta: HashMap>, pub state_table_info_delta: HashMap, } +pub type HummockVersionDelta = HummockVersionDeltaCommon; + impl Default for HummockVersionDelta { fn default() -> Self { HummockVersionDelta::from(&PbHummockVersionDelta::default()) } } -impl HummockVersionDelta { +impl HummockVersionDeltaCommon +where + T: for<'a> From<&'a PbSstableInfo>, + PbSstableInfo: for<'a> From<&'a T>, +{ /// Convert the `PbHummockVersionDelta` deserialized from persisted state to `HummockVersionDelta`. /// We should maintain backward compatibility. pub fn from_persisted_protobuf(delta: &PbHummockVersionDelta) -> Self { - Self::from(delta) + delta.into() } /// Convert the `PbHummockVersionDelta` received from rpc to `HummockVersionDelta`. No need to /// maintain backward compatibility. pub fn from_rpc_protobuf(delta: &PbHummockVersionDelta) -> Self { - Self::from(delta) + delta.into() } pub fn to_protobuf(&self) -> PbHummockVersionDelta { @@ -592,7 +616,10 @@ impl HummockVersionDelta { } } -impl From<&PbHummockVersionDelta> for HummockVersionDelta { +impl From<&PbHummockVersionDelta> for HummockVersionDeltaCommon +where + T: for<'a> From<&'a PbSstableInfo>, +{ fn from(pb_version_delta: &PbHummockVersionDelta) -> Self { Self { id: HummockVersionId(pb_version_delta.id), @@ -601,7 +628,10 @@ impl From<&PbHummockVersionDelta> for HummockVersionDelta { .group_deltas .iter() .map(|(group_id, deltas)| { - (*group_id as CompactionGroupId, GroupDeltas::from(deltas)) + ( + *group_id as CompactionGroupId, + GroupDeltasCommon::from(deltas), + ) }) .collect(), max_committed_epoch: pb_version_delta.max_committed_epoch, @@ -625,8 +655,8 @@ impl From<&PbHummockVersionDelta> for HummockVersionDelta { .map(|(table_id, log_delta)| { ( TableId::new(*table_id), - ChangeLogDelta { - new_log: log_delta.new_log.clone().map(Into::into), + ChangeLogDeltaCommon { + new_log: log_delta.new_log.as_ref().map(Into::into), truncate_epoch: log_delta.truncate_epoch, }, ) @@ -642,8 +672,11 @@ impl From<&PbHummockVersionDelta> for HummockVersionDelta { } } -impl From<&HummockVersionDelta> for PbHummockVersionDelta { - fn from(version_delta: &HummockVersionDelta) -> Self { +impl From<&HummockVersionDeltaCommon> for PbHummockVersionDelta +where + PbSstableInfo: for<'a> From<&'a T>, +{ + fn from(version_delta: &HummockVersionDeltaCommon) -> Self { Self { id: version_delta.id.0, prev_id: version_delta.prev_id.0, @@ -679,8 +712,11 @@ impl From<&HummockVersionDelta> for PbHummockVersionDelta { } } -impl From for PbHummockVersionDelta { - fn from(version_delta: HummockVersionDelta) -> Self { +impl From> for PbHummockVersionDelta +where + PbSstableInfo: From, +{ + fn from(version_delta: HummockVersionDeltaCommon) -> Self { Self { id: version_delta.id.0, prev_id: version_delta.prev_id.0, @@ -716,7 +752,10 @@ impl From for PbHummockVersionDelta { } } -impl From for HummockVersionDelta { +impl From for HummockVersionDeltaCommon +where + T: From, +{ fn from(pb_version_delta: PbHummockVersionDelta) -> Self { Self { id: HummockVersionId(pb_version_delta.id), @@ -745,7 +784,7 @@ impl From for HummockVersionDelta { .map(|(table_id, log_delta)| { ( TableId::new(*table_id), - ChangeLogDelta { + ChangeLogDeltaCommon { new_log: log_delta.new_log.clone().map(Into::into), truncate_epoch: log_delta.truncate_epoch, }, @@ -762,14 +801,16 @@ impl From for HummockVersionDelta { } #[derive(Debug, PartialEq, Clone)] -pub struct IntraLevelDelta { +pub struct IntraLevelDeltaCommon { pub level_idx: u32, pub l0_sub_level_id: u64, pub removed_table_ids: Vec, - pub inserted_table_infos: Vec, + pub inserted_table_infos: Vec, pub vnode_partition_count: u32, } +pub type IntraLevelDelta = IntraLevelDeltaCommon; + impl IntraLevelDelta { pub fn estimated_encode_len(&self) -> usize { size_of::() @@ -784,40 +825,49 @@ impl IntraLevelDelta { } } -impl From for IntraLevelDelta { +impl From for IntraLevelDeltaCommon +where + T: From, +{ fn from(pb_intra_level_delta: PbIntraLevelDelta) -> Self { Self { level_idx: pb_intra_level_delta.level_idx, l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id, - removed_table_ids: pb_intra_level_delta.removed_table_ids.clone(), + removed_table_ids: pb_intra_level_delta.removed_table_ids, inserted_table_infos: pb_intra_level_delta .inserted_table_infos .into_iter() - .map(SstableInfo::from) + .map(Into::into) .collect_vec(), vnode_partition_count: pb_intra_level_delta.vnode_partition_count, } } } -impl From for PbIntraLevelDelta { - fn from(intra_level_delta: IntraLevelDelta) -> Self { +impl From> for PbIntraLevelDelta +where + PbSstableInfo: From, +{ + fn from(intra_level_delta: IntraLevelDeltaCommon) -> Self { Self { level_idx: intra_level_delta.level_idx, l0_sub_level_id: intra_level_delta.l0_sub_level_id, - removed_table_ids: intra_level_delta.removed_table_ids.clone(), + removed_table_ids: intra_level_delta.removed_table_ids, inserted_table_infos: intra_level_delta .inserted_table_infos .into_iter() - .map(|sst| sst.into()) + .map(Into::into) .collect_vec(), vnode_partition_count: intra_level_delta.vnode_partition_count, } } } -impl From<&IntraLevelDelta> for PbIntraLevelDelta { - fn from(intra_level_delta: &IntraLevelDelta) -> Self { +impl From<&IntraLevelDeltaCommon> for PbIntraLevelDelta +where + PbSstableInfo: for<'a> From<&'a T>, +{ + fn from(intra_level_delta: &IntraLevelDeltaCommon) -> Self { Self { level_idx: intra_level_delta.level_idx, l0_sub_level_id: intra_level_delta.l0_sub_level_id, @@ -825,14 +875,17 @@ impl From<&IntraLevelDelta> for PbIntraLevelDelta { inserted_table_infos: intra_level_delta .inserted_table_infos .iter() - .map(|sst| sst.into()) + .map(Into::into) .collect_vec(), vnode_partition_count: intra_level_delta.vnode_partition_count, } } } -impl From<&PbIntraLevelDelta> for IntraLevelDelta { +impl From<&PbIntraLevelDelta> for IntraLevelDeltaCommon +where + T: for<'a> From<&'a PbSstableInfo>, +{ fn from(pb_intra_level_delta: &PbIntraLevelDelta) -> Self { Self { level_idx: pb_intra_level_delta.level_idx, @@ -841,7 +894,7 @@ impl From<&PbIntraLevelDelta> for IntraLevelDelta { inserted_table_infos: pb_intra_level_delta .inserted_table_infos .iter() - .map(SstableInfo::from) + .map(Into::into) .collect_vec(), vnode_partition_count: pb_intra_level_delta.vnode_partition_count, } @@ -867,8 +920,8 @@ impl IntraLevelDelta { } #[derive(Debug, PartialEq, Clone)] -pub enum GroupDelta { - IntraLevel(IntraLevelDelta), +pub enum GroupDeltaCommon { + IntraLevel(IntraLevelDeltaCommon), GroupConstruct(PbGroupConstruct), GroupDestroy(PbGroupDestroy), GroupMetaChange(PbGroupMetaChange), @@ -879,100 +932,116 @@ pub enum GroupDelta { GroupMerge(PbGroupMerge), } -impl From for GroupDelta { +pub type GroupDelta = GroupDeltaCommon; + +impl From for GroupDeltaCommon +where + T: From, +{ fn from(pb_group_delta: PbGroupDelta) -> Self { match pb_group_delta.delta_type { Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => { - GroupDelta::IntraLevel(IntraLevelDelta::from(pb_intra_level_delta)) + GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta)) } Some(PbDeltaType::GroupConstruct(pb_group_construct)) => { - GroupDelta::GroupConstruct(pb_group_construct) + GroupDeltaCommon::GroupConstruct(pb_group_construct) } Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => { - GroupDelta::GroupDestroy(pb_group_destroy) + GroupDeltaCommon::GroupDestroy(pb_group_destroy) } Some(PbDeltaType::GroupMetaChange(pb_group_meta_change)) => { - GroupDelta::GroupMetaChange(pb_group_meta_change) + GroupDeltaCommon::GroupMetaChange(pb_group_meta_change) } Some(PbDeltaType::GroupTableChange(pb_group_table_change)) => { - GroupDelta::GroupTableChange(pb_group_table_change) + GroupDeltaCommon::GroupTableChange(pb_group_table_change) + } + Some(PbDeltaType::GroupMerge(pb_group_merge)) => { + GroupDeltaCommon::GroupMerge(pb_group_merge) } - Some(PbDeltaType::GroupMerge(pb_group_merge)) => GroupDelta::GroupMerge(pb_group_merge), None => panic!("delta_type is not set"), } } } -impl From for PbGroupDelta { - fn from(group_delta: GroupDelta) -> Self { +impl From> for PbGroupDelta +where + PbSstableInfo: From, +{ + fn from(group_delta: GroupDeltaCommon) -> Self { match group_delta { - GroupDelta::IntraLevel(intra_level_delta) => PbGroupDelta { + GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta { delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())), }, - GroupDelta::GroupConstruct(pb_group_construct) => PbGroupDelta { + GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupConstruct(pb_group_construct)), }, - GroupDelta::GroupDestroy(pb_group_destroy) => PbGroupDelta { + GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupDestroy(pb_group_destroy)), }, - GroupDelta::GroupMetaChange(pb_group_meta_change) => PbGroupDelta { + GroupDeltaCommon::GroupMetaChange(pb_group_meta_change) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMetaChange(pb_group_meta_change)), }, - GroupDelta::GroupTableChange(pb_group_table_change) => PbGroupDelta { + GroupDeltaCommon::GroupTableChange(pb_group_table_change) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupTableChange(pb_group_table_change)), }, - GroupDelta::GroupMerge(pb_group_merge) => PbGroupDelta { + GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)), }, } } } -impl From<&GroupDelta> for PbGroupDelta { - fn from(group_delta: &GroupDelta) -> Self { +impl From<&GroupDeltaCommon> for PbGroupDelta +where + PbSstableInfo: for<'a> From<&'a T>, +{ + fn from(group_delta: &GroupDeltaCommon) -> Self { match group_delta { - GroupDelta::IntraLevel(intra_level_delta) => PbGroupDelta { + GroupDeltaCommon::IntraLevel(intra_level_delta) => PbGroupDelta { delta_type: Some(PbDeltaType::IntraLevel(intra_level_delta.into())), }, - GroupDelta::GroupConstruct(pb_group_construct) => PbGroupDelta { + GroupDeltaCommon::GroupConstruct(pb_group_construct) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupConstruct(pb_group_construct.clone())), }, - GroupDelta::GroupDestroy(pb_group_destroy) => PbGroupDelta { + GroupDeltaCommon::GroupDestroy(pb_group_destroy) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupDestroy(*pb_group_destroy)), }, - GroupDelta::GroupMetaChange(pb_group_meta_change) => PbGroupDelta { + GroupDeltaCommon::GroupMetaChange(pb_group_meta_change) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMetaChange(pb_group_meta_change.clone())), }, - GroupDelta::GroupTableChange(pb_group_table_change) => PbGroupDelta { + GroupDeltaCommon::GroupTableChange(pb_group_table_change) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupTableChange(pb_group_table_change.clone())), }, - GroupDelta::GroupMerge(pb_group_merge) => PbGroupDelta { + GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)), }, } } } -impl From<&PbGroupDelta> for GroupDelta { +impl From<&PbGroupDelta> for GroupDeltaCommon +where + T: for<'a> From<&'a PbSstableInfo>, +{ fn from(pb_group_delta: &PbGroupDelta) -> Self { match &pb_group_delta.delta_type { Some(PbDeltaType::IntraLevel(pb_intra_level_delta)) => { - GroupDelta::IntraLevel(IntraLevelDelta::from(pb_intra_level_delta)) + GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon::from(pb_intra_level_delta)) } Some(PbDeltaType::GroupConstruct(pb_group_construct)) => { - GroupDelta::GroupConstruct(pb_group_construct.clone()) + GroupDeltaCommon::GroupConstruct(pb_group_construct.clone()) } Some(PbDeltaType::GroupDestroy(pb_group_destroy)) => { - GroupDelta::GroupDestroy(*pb_group_destroy) + GroupDeltaCommon::GroupDestroy(*pb_group_destroy) } Some(PbDeltaType::GroupMetaChange(pb_group_meta_change)) => { - GroupDelta::GroupMetaChange(pb_group_meta_change.clone()) + GroupDeltaCommon::GroupMetaChange(pb_group_meta_change.clone()) } Some(PbDeltaType::GroupTableChange(pb_group_table_change)) => { - GroupDelta::GroupTableChange(pb_group_table_change.clone()) + GroupDeltaCommon::GroupTableChange(pb_group_table_change.clone()) } Some(PbDeltaType::GroupMerge(pb_group_merge)) => { - GroupDelta::GroupMerge(*pb_group_merge) + GroupDeltaCommon::GroupMerge(*pb_group_merge) } None => panic!("delta_type is not set"), } @@ -980,24 +1049,32 @@ impl From<&PbGroupDelta> for GroupDelta { } #[derive(Debug, PartialEq, Clone, Default)] -pub struct GroupDeltas { - pub group_deltas: Vec, +pub struct GroupDeltasCommon { + pub group_deltas: Vec>, } -impl From for GroupDeltas { +pub type GroupDeltas = GroupDeltasCommon; + +impl From for GroupDeltasCommon +where + T: From, +{ fn from(pb_group_deltas: PbGroupDeltas) -> Self { Self { group_deltas: pb_group_deltas .group_deltas .into_iter() - .map(GroupDelta::from) + .map(GroupDeltaCommon::from) .collect_vec(), } } } -impl From for PbGroupDeltas { - fn from(group_deltas: GroupDeltas) -> Self { +impl From> for PbGroupDeltas +where + PbSstableInfo: From, +{ + fn from(group_deltas: GroupDeltasCommon) -> Self { Self { group_deltas: group_deltas .group_deltas @@ -1008,8 +1085,11 @@ impl From for PbGroupDeltas { } } -impl From<&GroupDeltas> for PbGroupDeltas { - fn from(group_deltas: &GroupDeltas) -> Self { +impl From<&GroupDeltasCommon> for PbGroupDeltas +where + PbSstableInfo: for<'a> From<&'a T>, +{ + fn from(group_deltas: &GroupDeltasCommon) -> Self { Self { group_deltas: group_deltas .group_deltas @@ -1020,19 +1100,25 @@ impl From<&GroupDeltas> for PbGroupDeltas { } } -impl From<&PbGroupDeltas> for GroupDeltas { +impl From<&PbGroupDeltas> for GroupDeltasCommon +where + T: for<'a> From<&'a PbSstableInfo>, +{ fn from(pb_group_deltas: &PbGroupDeltas) -> Self { Self { group_deltas: pb_group_deltas .group_deltas .iter() - .map(GroupDelta::from) + .map(GroupDeltaCommon::from) .collect_vec(), } } } -impl GroupDeltas { +impl GroupDeltasCommon +where + PbSstableInfo: for<'a> From<&'a T>, +{ pub fn to_protobuf(&self) -> PbGroupDeltas { self.into() }