diff --git a/proto/hummock.proto b/proto/hummock.proto index 6d3d7b0d32d7d..c95912ebd94ef 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -63,6 +63,10 @@ message InputLevel { repeated SstableInfo table_infos = 3; } +message NewL0SubLevel { + repeated SstableInfo inserted_table_infos = 1; +} + message IntraLevelDelta { uint32 level_idx = 1; uint64 l0_sub_level_id = 2; @@ -112,6 +116,7 @@ message GroupDelta { GroupConstruct group_construct = 2; GroupDestroy group_destroy = 3; GroupMerge group_merge = 6; + NewL0SubLevel new_l0_sub_level = 7; } } diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index f678014d440c8..8c6a57c7347ce 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -17,10 +17,8 @@ use std::ops::Bound::{Excluded, Included}; use std::ops::{Deref, DerefMut}; use std::sync::atomic::Ordering; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ - object_size_map, summarize_group_deltas, -}; -use risingwave_hummock_sdk::version::HummockVersion; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map; +use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion}; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects}; use risingwave_pb::hummock::{ @@ -156,13 +154,24 @@ impl HummockManager { .hummock_version_deltas .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id))) { - for (group_id, group_deltas) in &version_delta.group_deltas { - let summary = summarize_group_deltas(group_deltas, *group_id); + for group_deltas in version_delta.group_deltas.values() { object_sizes.extend( - summary - .insert_table_infos + group_deltas + .group_deltas .iter() - .map(|t| (t.object_id, t.file_size)) + .flat_map(|delta| { + match delta { + GroupDeltaCommon::IntraLevel(level_delta) => Some(level_delta), + _ => None, + } + .into_iter() + .flat_map(|level_delta| { + level_delta + .inserted_table_infos + .iter() + .map(|t| (t.object_id, t.file_size)) + }) + }) .chain( version_delta .change_log_delta diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index f94c01efbd7ae..d8cbcad27c095 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -153,17 +153,15 @@ impl<'a> HummockVersionTransaction<'a> { .entry(compact_task.compaction_group_id) .or_default() .group_deltas; - let mut removed_table_ids_map: BTreeMap> = BTreeMap::default(); + let mut removed_table_ids_map: BTreeMap> = BTreeMap::default(); for level in &compact_task.input_ssts { let level_idx = level.level_idx; - let mut removed_table_ids = - level.table_infos.iter().map(|sst| sst.sst_id).collect_vec(); removed_table_ids_map .entry(level_idx) .or_default() - .append(&mut removed_table_ids); + .extend(level.table_infos.iter().map(|sst| sst.sst_id)); } for (level_idx, removed_table_ids) in removed_table_ids_map { @@ -181,7 +179,7 @@ impl<'a> HummockVersionTransaction<'a> { let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( compact_task.target_level, compact_task.target_sub_level_id, - vec![], // default + HashSet::new(), // default compact_task.sorted_output_ssts.clone(), compact_task.split_weight_by_vnode, )); diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index 04d5e237a11df..b59490d5beadb 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -20,9 +20,7 @@ use risingwave_hummock_sdk::change_log::ChangeLogDelta; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::TableWatermarks; -use risingwave_hummock_sdk::version::{ - GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta, -}; +use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ CompactionGroupId, FrontendHummockVersionDelta, HummockEpoch, HummockVersionId, }; @@ -154,13 +152,7 @@ impl<'a> HummockVersionTransaction<'a> { .entry(compaction_group_id) .or_default() .group_deltas; - let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( - 0, - 0, // l0_sub_level_id will be generated during apply_version_delta - vec![], // default - inserted_table_infos, - 0, // default - )); + let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos); group_deltas.push(group_delta); } diff --git a/src/prost/build.rs b/src/prost/build.rs index ee04705ef19e5..0e1b2ea5c1db6 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -183,6 +183,7 @@ fn main() -> Result<(), Box> { .type_attribute("hummock.GroupTableChange", "#[derive(Eq)]") .type_attribute("hummock.GroupMerge", "#[derive(Eq)]") .type_attribute("hummock.GroupDelta", "#[derive(Eq)]") + .type_attribute("hummock.NewL0SubLevel", "#[derive(Eq)]") .type_attribute("hummock.LevelHandler.RunningCompactTask", "#[derive(Eq)]") .type_attribute("hummock.LevelHandler", "#[derive(Eq)]") .type_attribute("hummock.TableOption", "#[derive(Eq)]") diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 0ffdd15eca498..b3cdcbe2aead8 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -22,8 +22,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; use risingwave_pb::hummock::{ - CompactionConfig, CompatibilityVersion, GroupConstruct, GroupMerge, PbLevelType, - StateTableInfo, StateTableInfoDelta, + CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta, }; use tracing::warn; @@ -36,83 +35,11 @@ use crate::level::{Level, Levels, OverlappingLevel}; use crate::sstable_info::SstableInfo; use crate::table_watermark::{ReadTableWatermark, TableWatermarks}; use crate::version::{ - GroupDelta, GroupDeltas, HummockVersion, HummockVersionCommon, HummockVersionDelta, - HummockVersionStateTableInfo, IntraLevelDelta, + GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDelta, + HummockVersionStateTableInfo, IntraLevelDelta, IntraLevelDeltaCommon, }; use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId}; -pub struct GroupDeltasSummary { - pub delete_sst_levels: Vec, - pub delete_sst_ids_set: HashSet, - pub insert_sst_level_id: u32, - pub insert_sub_level_id: u64, - pub insert_table_infos: Vec, - pub group_construct: Option, - pub group_destroy: Option, - pub new_vnode_partition_count: u32, - pub group_merge: Option, -} - -pub fn summarize_group_deltas( - group_deltas: &GroupDeltas, - compaction_group_id: CompactionGroupId, -) -> GroupDeltasSummary { - let mut delete_sst_levels = Vec::with_capacity(group_deltas.group_deltas.len()); - let mut delete_sst_ids_set = HashSet::new(); - let mut insert_sst_level_id = u32::MAX; - let mut insert_sub_level_id = u64::MAX; - let mut insert_table_infos = vec![]; - let mut group_construct = None; - let mut group_destroy = None; - let mut new_vnode_partition_count = 0; - let mut group_merge = None; - - for group_delta in &group_deltas.group_deltas { - match group_delta { - GroupDelta::IntraLevel(intra_level) => { - if !intra_level.removed_table_ids.is_empty() { - delete_sst_levels.push(intra_level.level_idx); - delete_sst_ids_set.extend(intra_level.removed_table_ids.iter().clone()); - } - if !intra_level.inserted_table_infos.is_empty() { - insert_sst_level_id = intra_level.level_idx; - insert_sub_level_id = intra_level.l0_sub_level_id; - insert_table_infos.extend(intra_level.inserted_table_infos.iter().cloned()); - } - new_vnode_partition_count = intra_level.vnode_partition_count; - } - GroupDelta::GroupConstruct(construct_delta) => { - assert!(group_construct.is_none()); - group_construct = Some(construct_delta.clone()); - } - GroupDelta::GroupDestroy(_) => { - assert!(group_destroy.is_none()); - group_destroy = Some(compaction_group_id); - } - GroupDelta::GroupMerge(merge_delta) => { - assert!(group_merge.is_none()); - group_merge = Some(*merge_delta); - group_destroy = Some(merge_delta.right_group_id); - } - } - } - - delete_sst_levels.sort(); - delete_sst_levels.dedup(); - - GroupDeltasSummary { - delete_sst_levels, - delete_sst_ids_set, - insert_sst_level_id, - insert_sub_level_id, - insert_table_infos, - group_construct, - group_destroy, - new_vnode_partition_count, - group_merge, - } -} - #[derive(Clone, Default)] pub struct TableGroupInfo { pub group_id: CompactionGroupId, @@ -587,118 +514,136 @@ impl HummockVersion { // apply to `levels`, which is different compaction groups for (compaction_group_id, group_deltas) in &version_delta.group_deltas { - let summary = summarize_group_deltas(group_deltas, *compaction_group_id); - if let Some(group_construct) = &summary.group_construct { - let mut new_levels = build_initial_compaction_group_levels( - *compaction_group_id, - group_construct.get_group_config().unwrap(), - ); - let parent_group_id = group_construct.parent_group_id; - new_levels.parent_group_id = parent_group_id; - #[expect(deprecated)] - // for backward-compatibility of previous hummock version delta - new_levels - .member_table_ids - .clone_from(&group_construct.table_ids); - self.levels.insert(*compaction_group_id, new_levels); - let member_table_ids = - if group_construct.version >= CompatibilityVersion::NoMemberTableIds as _ { - self.state_table_info - .compaction_group_member_table_ids(*compaction_group_id) - .iter() - .map(|table_id| table_id.table_id) - .collect() - } else { + for group_delta in &group_deltas.group_deltas { + match group_delta { + GroupDeltaCommon::GroupConstruct(group_construct) => { + let mut new_levels = build_initial_compaction_group_levels( + *compaction_group_id, + group_construct.get_group_config().unwrap(), + ); + let parent_group_id = group_construct.parent_group_id; + new_levels.parent_group_id = parent_group_id; #[expect(deprecated)] // for backward-compatibility of previous hummock version delta - BTreeSet::from_iter(group_construct.table_ids.clone()) - }; - - if group_construct.version >= CompatibilityVersion::SplitGroupByTableId as _ { - let split_key = if group_construct.split_key.is_some() { - Some(Bytes::from(group_construct.split_key.clone().unwrap())) - } else { - None - }; - self.init_with_parent_group_v2( - parent_group_id, - *compaction_group_id, - group_construct.get_new_sst_start_id(), - split_key.clone(), - ); - } else { - // for backward-compatibility of previous hummock version delta - self.init_with_parent_group( - parent_group_id, - *compaction_group_id, - member_table_ids, - group_construct.get_new_sst_start_id(), - ); - } - } else if let Some(group_merge) = &summary.group_merge { - tracing::info!( - "group_merge left {:?} right {:?}", - group_merge.left_group_id, - group_merge.right_group_id - ); - self.merge_compaction_group(group_merge.left_group_id, group_merge.right_group_id) - } - let group_destroy = summary.group_destroy; - let levels = self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { - panic!("compaction group {} does not exist", compaction_group_id) - }); - - if is_commit_epoch { - let GroupDeltasSummary { - delete_sst_levels, - delete_sst_ids_set, - .. - } = summary; - - assert!( - delete_sst_levels.is_empty() && delete_sst_ids_set.is_empty() - || group_destroy.is_some(), - "no sst should be deleted when committing an epoch" - ); - let mut next_l0_sub_level_id = levels - .l0 - .sub_levels - .last() - .map(|level| level.sub_level_id + 1) - .unwrap_or(1); - for group_delta in &group_deltas.group_deltas { - if let GroupDelta::IntraLevel(IntraLevelDelta { - level_idx, - inserted_table_infos, - .. - }) = group_delta - { - assert_eq!( - *level_idx, 0, - "we should only add to L0 when we commit an epoch." + new_levels + .member_table_ids + .clone_from(&group_construct.table_ids); + self.levels.insert(*compaction_group_id, new_levels); + let member_table_ids = if group_construct.version + >= CompatibilityVersion::NoMemberTableIds as _ + { + self.state_table_info + .compaction_group_member_table_ids(*compaction_group_id) + .iter() + .map(|table_id| table_id.table_id) + .collect() + } else { + #[expect(deprecated)] + // for backward-compatibility of previous hummock version delta + BTreeSet::from_iter(group_construct.table_ids.clone()) + }; + + if group_construct.version >= CompatibilityVersion::SplitGroupByTableId as _ + { + let split_key = if group_construct.split_key.is_some() { + Some(Bytes::from(group_construct.split_key.clone().unwrap())) + } else { + None + }; + self.init_with_parent_group_v2( + parent_group_id, + *compaction_group_id, + group_construct.get_new_sst_start_id(), + split_key.clone(), + ); + } else { + // for backward-compatibility of previous hummock version delta + self.init_with_parent_group( + parent_group_id, + *compaction_group_id, + member_table_ids, + group_construct.get_new_sst_start_id(), + ); + } + } + GroupDeltaCommon::GroupMerge(group_merge) => { + tracing::info!( + "group_merge left {:?} right {:?}", + group_merge.left_group_id, + group_merge.right_group_id ); - if !inserted_table_infos.is_empty() { - insert_new_sub_level( - &mut levels.l0, - next_l0_sub_level_id, - PbLevelType::Overlapping, - inserted_table_infos.clone(), - None, + self.merge_compaction_group( + group_merge.left_group_id, + group_merge.right_group_id, + ) + } + GroupDeltaCommon::IntraLevel(level_delta) => { + let levels = + self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { + panic!("compaction group {} does not exist", compaction_group_id) + }); + if is_commit_epoch { + assert!( + level_delta.removed_table_ids.is_empty(), + "no sst should be deleted when committing an epoch" + ); + + let IntraLevelDelta { + level_idx, + l0_sub_level_id, + inserted_table_infos, + .. + } = level_delta; + { + assert_eq!( + *level_idx, 0, + "we should only add to L0 when we commit an epoch." + ); + if !inserted_table_infos.is_empty() { + insert_new_sub_level( + &mut levels.l0, + *l0_sub_level_id, + PbLevelType::Overlapping, + inserted_table_infos.clone(), + None, + ); + } + } + } else { + // The delta is caused by compaction. + levels.apply_compact_ssts( + level_delta, + self.state_table_info + .compaction_group_member_table_ids(*compaction_group_id), ); - next_l0_sub_level_id += 1; } } + GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => { + let levels = + self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { + panic!("compaction group {} does not exist", compaction_group_id) + }); + assert!(is_commit_epoch); + + let next_l0_sub_level_id = levels + .l0 + .sub_levels + .last() + .map(|level| level.sub_level_id + 1) + .unwrap_or(1); + + insert_new_sub_level( + &mut levels.l0, + next_l0_sub_level_id, + PbLevelType::Overlapping, + inserted_table_infos.clone(), + None, + ); + } + GroupDeltaCommon::GroupDestroy(_) => { + self.levels.remove(compaction_group_id); + } } - } else { - // The delta is caused by compaction. - levels.apply_compact_ssts( - summary, - self.state_table_info - .compaction_group_member_table_ids(*compaction_group_id), - ); - } - if let Some(destroy_group_id) = &group_destroy { - self.levels.remove(destroy_group_id); } } self.id = version_delta.id; @@ -1009,50 +954,50 @@ impl HummockVersionCommon { impl Levels { pub fn apply_compact_ssts( &mut self, - summary: GroupDeltasSummary, + level_delta: &IntraLevelDeltaCommon, member_table_ids: &BTreeSet, ) { - let GroupDeltasSummary { - delete_sst_levels, - delete_sst_ids_set, - insert_sst_level_id, - insert_sub_level_id, - insert_table_infos, - new_vnode_partition_count, + let IntraLevelDeltaCommon { + level_idx, + l0_sub_level_id, + inserted_table_infos: insert_table_infos, + vnode_partition_count, + removed_table_ids: delete_sst_ids_set, .. - } = summary; + } = level_delta; + let new_vnode_partition_count = *vnode_partition_count; - if !self.check_deleted_sst_exist(&delete_sst_levels, delete_sst_ids_set.clone()) { + if !self.check_deleted_sst_exist(&[*level_idx], delete_sst_ids_set.clone()) { warn!( "This VersionDelta may be committed by an expired compact task. Please check it. \n - delete_sst_levels: {:?}\n, - delete_sst_ids_set: {:?}\n, insert_sst_level_id: {}\n, insert_sub_level_id: {}\n, - insert_table_infos: {:?}\n", - delete_sst_levels, - delete_sst_ids_set, - insert_sst_level_id, - insert_sub_level_id, + insert_table_infos: {:?}\n, + delete_sst_ids_set: {:?}\n", + level_idx, + l0_sub_level_id, insert_table_infos .iter() .map(|sst| (sst.sst_id, sst.object_id)) - .collect_vec() + .collect_vec(), + delete_sst_ids_set, ); return; } - for level_idx in &delete_sst_levels { + if !delete_sst_ids_set.is_empty() { if *level_idx == 0 { for level in &mut self.l0.sub_levels { - level_delete_ssts(level, &delete_sst_ids_set); + level_delete_ssts(level, delete_sst_ids_set); } } else { let idx = *level_idx as usize - 1; - level_delete_ssts(&mut self.levels[idx], &delete_sst_ids_set); + level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set); } } if !insert_table_infos.is_empty() { + let insert_sst_level_id = *level_idx; + let insert_sub_level_id = *l0_sub_level_id; if insert_sst_level_id == 0 { let l0 = &mut self.l0; let index = l0 @@ -1093,7 +1038,7 @@ impl Levels { level_insert_ssts(&mut self.levels[idx], insert_table_infos); } } - if delete_sst_levels.iter().any(|level_id| *level_id == 0) { + if *level_idx == 0 && !delete_sst_ids_set.is_empty() { self.l0 .sub_levels .retain(|level| !level.table_infos.is_empty()); @@ -1358,7 +1303,7 @@ fn level_delete_ssts( original_len != operand.table_infos.len() } -fn level_insert_ssts(operand: &mut Level, insert_table_infos: Vec) { +fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec) { operand.total_file_size += insert_table_infos .iter() .map(|sst| sst.sst_size) @@ -1367,7 +1312,9 @@ fn level_insert_ssts(operand: &mut Level, insert_table_infos: Vec) .iter() .map(|sst| sst.uncompressed_file_size) .sum::(); - operand.table_infos.extend(insert_table_infos); + operand + .table_infos + .extend(insert_table_infos.iter().cloned()); operand .table_infos .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range)); @@ -1501,7 +1448,7 @@ pub fn validate_version(version: &HummockVersion) -> Vec { #[cfg(test)] mod tests { - use std::collections::HashMap; + use std::collections::{HashMap, HashSet}; use bytes::Bytes; use risingwave_common::catalog::TableId; @@ -1655,7 +1602,7 @@ mod tests { group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new( 1, 0, - vec![], + HashSet::new(), vec![SstableInfo { object_id: 1, sst_id: 1, diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index dbd927e3d724a..fd796c76783de 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -21,12 +21,12 @@ use std::sync::{Arc, LazyLock}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::INVALID_EPOCH; -use risingwave_pb::hummock::group_delta::PbDeltaType; +use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType}; use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas; use risingwave_pb::hummock::{ CompactionConfig, PbGroupConstruct, PbGroupDelta, PbGroupDestroy, PbGroupMerge, - PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, PbSstableInfo, PbStateTableInfo, - StateTableInfo, StateTableInfoDelta, + PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, PbNewL0SubLevel, PbSstableInfo, + PbStateTableInfo, StateTableInfo, StateTableInfoDelta, }; use tracing::warn; @@ -785,7 +785,7 @@ where pub struct IntraLevelDeltaCommon { pub level_idx: u32, pub l0_sub_level_id: u64, - pub removed_table_ids: Vec, + pub removed_table_ids: HashSet, pub inserted_table_infos: Vec, pub vnode_partition_count: u32, } @@ -814,7 +814,7 @@ where Self { level_idx: pb_intra_level_delta.level_idx, l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id, - removed_table_ids: pb_intra_level_delta.removed_table_ids, + removed_table_ids: HashSet::from_iter(pb_intra_level_delta.removed_table_ids), inserted_table_infos: pb_intra_level_delta .inserted_table_infos .into_iter() @@ -833,7 +833,7 @@ where Self { level_idx: intra_level_delta.level_idx, l0_sub_level_id: intra_level_delta.l0_sub_level_id, - removed_table_ids: intra_level_delta.removed_table_ids, + removed_table_ids: intra_level_delta.removed_table_ids.into_iter().collect(), inserted_table_infos: intra_level_delta .inserted_table_infos .into_iter() @@ -852,7 +852,11 @@ where Self { level_idx: intra_level_delta.level_idx, l0_sub_level_id: intra_level_delta.l0_sub_level_id, - removed_table_ids: intra_level_delta.removed_table_ids.clone(), + removed_table_ids: intra_level_delta + .removed_table_ids + .iter() + .cloned() + .collect(), inserted_table_infos: intra_level_delta .inserted_table_infos .iter() @@ -871,7 +875,9 @@ where Self { level_idx: pb_intra_level_delta.level_idx, l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id, - removed_table_ids: pb_intra_level_delta.removed_table_ids.clone(), + removed_table_ids: HashSet::from_iter( + pb_intra_level_delta.removed_table_ids.iter().cloned(), + ), inserted_table_infos: pb_intra_level_delta .inserted_table_infos .iter() @@ -886,7 +892,7 @@ impl IntraLevelDelta { pub fn new( level_idx: u32, l0_sub_level_id: u64, - removed_table_ids: Vec, + removed_table_ids: HashSet, inserted_table_infos: Vec, vnode_partition_count: u32, ) -> Self { @@ -902,6 +908,7 @@ impl IntraLevelDelta { #[derive(Debug, PartialEq, Clone)] pub enum GroupDeltaCommon { + NewL0SubLevel(Vec), IntraLevel(IntraLevelDeltaCommon), GroupConstruct(PbGroupConstruct), GroupDestroy(PbGroupDestroy), @@ -928,6 +935,13 @@ where Some(PbDeltaType::GroupMerge(pb_group_merge)) => { GroupDeltaCommon::GroupMerge(pb_group_merge) } + Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel( + pb_new_sub_level + .inserted_table_infos + .into_iter() + .map(T::from) + .collect(), + ), None => panic!("delta_type is not set"), } } @@ -951,6 +965,14 @@ where GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)), }, + GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta { + delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel { + inserted_table_infos: new_sub_level + .into_iter() + .map(PbSstableInfo::from) + .collect(), + })), + }, } } } @@ -973,6 +995,11 @@ where GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)), }, + GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta { + delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel { + inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(), + })), + }, } } } @@ -995,6 +1022,13 @@ where Some(PbDeltaType::GroupMerge(pb_group_merge)) => { GroupDeltaCommon::GroupMerge(*pb_group_merge) } + Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel( + pb_new_sub_level + .inserted_table_infos + .iter() + .map(T::from) + .collect(), + ), None => panic!("delta_type is not set"), } }