diff --git a/proto/hummock.proto b/proto/hummock.proto index c59a181197f9e..ec0e20f6013ce 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -63,6 +63,10 @@ message InputLevel { repeated SstableInfo table_infos = 3; } +message NewL0SubLevel { + repeated SstableInfo inserted_table_infos = 1; +} + message IntraLevelDelta { uint32 level_idx = 1; uint64 l0_sub_level_id = 2; @@ -112,6 +116,7 @@ message GroupDelta { GroupConstruct group_construct = 2; GroupDestroy group_destroy = 3; GroupMerge group_merge = 6; + NewL0SubLevel new_l0_sub_level = 7; } } diff --git a/src/ctl/src/cmd_impl/hummock/validate_version.rs b/src/ctl/src/cmd_impl/hummock/validate_version.rs index b6ab7f111aaac..62e988f42f1cf 100644 --- a/src/ctl/src/cmd_impl/hummock/validate_version.rs +++ b/src/ctl/src/cmd_impl/hummock/validate_version.rs @@ -206,14 +206,22 @@ pub async fn print_version_delta_in_archive( } fn match_delta(delta: &DeltaType, sst_id: HummockSstableObjectId) -> bool { - let DeltaType::IntraLevel(delta) = delta else { - return false; - }; - delta - .inserted_table_infos - .iter() - .any(|sst| sst.sst_id == sst_id) - || delta.removed_table_ids.iter().any(|sst| *sst == sst_id) + match delta { + DeltaType::GroupConstruct(_) | DeltaType::GroupDestroy(_) | DeltaType::GroupMerge(_) => { + false + } + DeltaType::IntraLevel(delta) => { + delta + .inserted_table_infos + .iter() + .any(|sst| sst.sst_id == sst_id) + || delta.removed_table_ids.iter().any(|sst| *sst == sst_id) + } + DeltaType::NewL0SubLevel(delta) => delta + .inserted_table_infos + .iter() + .any(|sst| sst.sst_id == sst_id), + } } fn print_delta(delta: &DeltaType) { diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index f678014d440c8..ea747dbf402e5 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -17,10 +17,8 @@ use std::ops::Bound::{Excluded, Included}; use std::ops::{Deref, DerefMut}; use std::sync::atomic::Ordering; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ - object_size_map, summarize_group_deltas, -}; -use risingwave_hummock_sdk::version::HummockVersion; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map; +use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion}; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects}; use risingwave_pb::hummock::{ @@ -156,13 +154,27 @@ impl HummockManager { .hummock_version_deltas .range((Excluded(old_checkpoint_id), Included(new_checkpoint_id))) { - for (group_id, group_deltas) in &version_delta.group_deltas { - let summary = summarize_group_deltas(group_deltas, *group_id); + for group_deltas in version_delta.group_deltas.values() { object_sizes.extend( - summary - .insert_table_infos + group_deltas + .group_deltas .iter() - .map(|t| (t.object_id, t.file_size)) + .flat_map(|delta| { + match delta { + GroupDeltaCommon::IntraLevel(level_delta) => { + Some(level_delta.inserted_table_infos.iter()) + } + GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => { + Some(inserted_table_infos.iter()) + } + GroupDeltaCommon::GroupConstruct(_) + | GroupDeltaCommon::GroupDestroy(_) + | GroupDeltaCommon::GroupMerge(_) => None, + } + .into_iter() + .flatten() + .map(|t| (t.object_id, t.file_size)) + }) .chain( version_delta .change_log_delta diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs index 975d764c9aed8..eabf23b07c579 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -21,9 +21,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::compact_task::ReportTask; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ - HummockLevelsExt, TableGroupInfo, -}; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::TableGroupInfo; use risingwave_hummock_sdk::compaction_group::{ group_split, StateTableId, StaticCompactionGroupId, }; diff --git a/src/meta/src/hummock/manager/compaction/mod.rs b/src/meta/src/hummock/manager/compaction/mod.rs index 6ea718bc1e02e..df99ced584e0b 100644 --- a/src/meta/src/hummock/manager/compaction/mod.rs +++ b/src/meta/src/hummock/manager/compaction/mod.rs @@ -41,7 +41,6 @@ use rand::seq::SliceRandom; use rand::thread_rng; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask}; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockLevelsExt; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::level::Levels; use risingwave_hummock_sdk::sstable_info::SstableInfo; @@ -153,17 +152,15 @@ impl<'a> HummockVersionTransaction<'a> { .entry(compact_task.compaction_group_id) .or_default() .group_deltas; - let mut removed_table_ids_map: BTreeMap> = BTreeMap::default(); + let mut removed_table_ids_map: BTreeMap> = BTreeMap::default(); for level in &compact_task.input_ssts { let level_idx = level.level_idx; - let mut removed_table_ids = - level.table_infos.iter().map(|sst| sst.sst_id).collect_vec(); removed_table_ids_map .entry(level_idx) .or_default() - .append(&mut removed_table_ids); + .extend(level.table_infos.iter().map(|sst| sst.sst_id)); } for (level_idx, removed_table_ids) in removed_table_ids_map { @@ -181,7 +178,7 @@ impl<'a> HummockVersionTransaction<'a> { let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( compact_task.target_level, compact_task.target_sub_level_id, - vec![], // default + HashSet::new(), // default compact_task.sorted_output_ssts.clone(), compact_task.split_weight_by_vnode, )); diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index d342ec5b7bc61..4e28fa512abbd 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -448,7 +448,7 @@ impl HummockManager { } let written = write_sstable_infos( delta - .newly_added_sst_infos(&select_groups) + .newly_added_sst_infos(Some(&select_groups)) .filter(|s| !skip_sst_ids.contains(&s.sst_id)), txn, ) diff --git a/src/meta/src/hummock/manager/transaction.rs b/src/meta/src/hummock/manager/transaction.rs index a8d3645d29037..57a228f35805f 100644 --- a/src/meta/src/hummock/manager/transaction.rs +++ b/src/meta/src/hummock/manager/transaction.rs @@ -21,9 +21,7 @@ use risingwave_hummock_sdk::change_log::ChangeLogDelta; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::table_watermark::TableWatermarks; -use risingwave_hummock_sdk::version::{ - GroupDelta, HummockVersion, HummockVersionDelta, IntraLevelDelta, -}; +use risingwave_hummock_sdk::version::{GroupDelta, HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{CompactionGroupId, FrontendHummockVersionDelta, HummockVersionId}; use risingwave_pb::hummock::{ CompactionConfig, CompatibilityVersion, GroupConstruct, HummockVersionDeltas, @@ -152,13 +150,7 @@ impl<'a> HummockVersionTransaction<'a> { .entry(compaction_group_id) .or_default() .group_deltas; - let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new( - 0, - 0, // l0_sub_level_id will be generated during apply_version_delta - vec![], // default - inserted_table_infos, - 0, // default - )); + let group_delta = GroupDelta::NewL0SubLevel(inserted_table_infos); group_deltas.push(group_delta); } diff --git a/src/prost/build.rs b/src/prost/build.rs index 194d565e31508..c4744e14c1b60 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -183,6 +183,7 @@ fn main() -> Result<(), Box> { .type_attribute("hummock.GroupTableChange", "#[derive(Eq)]") .type_attribute("hummock.GroupMerge", "#[derive(Eq)]") .type_attribute("hummock.GroupDelta", "#[derive(Eq)]") + .type_attribute("hummock.NewL0SubLevel", "#[derive(Eq)]") .type_attribute("hummock.LevelHandler.RunningCompactTask", "#[derive(Eq)]") .type_attribute("hummock.LevelHandler", "#[derive(Eq)]") .type_attribute("hummock.TableOption", "#[derive(Eq)]") diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index a098efff62a2b..0189e136099b9 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -22,8 +22,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; use risingwave_pb::hummock::{ - CompactionConfig, CompatibilityVersion, GroupConstruct, GroupMerge, PbLevelType, - StateTableInfo, StateTableInfoDelta, + CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta, }; use tracing::warn; @@ -36,83 +35,11 @@ use crate::level::{Level, Levels, OverlappingLevel}; use crate::sstable_info::SstableInfo; use crate::table_watermark::{ReadTableWatermark, TableWatermarks}; use crate::version::{ - GroupDelta, GroupDeltas, HummockVersion, HummockVersionCommon, HummockVersionDelta, - HummockVersionStateTableInfo, IntraLevelDelta, + GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDelta, + HummockVersionStateTableInfo, IntraLevelDelta, IntraLevelDeltaCommon, }; use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId}; -pub struct GroupDeltasSummary { - pub delete_sst_levels: Vec, - pub delete_sst_ids_set: HashSet, - pub insert_sst_level_id: u32, - pub insert_sub_level_id: u64, - pub insert_table_infos: Vec, - pub group_construct: Option, - pub group_destroy: Option, - pub new_vnode_partition_count: u32, - pub group_merge: Option, -} - -pub fn summarize_group_deltas( - group_deltas: &GroupDeltas, - compaction_group_id: CompactionGroupId, -) -> GroupDeltasSummary { - let mut delete_sst_levels = Vec::with_capacity(group_deltas.group_deltas.len()); - let mut delete_sst_ids_set = HashSet::new(); - let mut insert_sst_level_id = u32::MAX; - let mut insert_sub_level_id = u64::MAX; - let mut insert_table_infos = vec![]; - let mut group_construct = None; - let mut group_destroy = None; - let mut new_vnode_partition_count = 0; - let mut group_merge = None; - - for group_delta in &group_deltas.group_deltas { - match group_delta { - GroupDelta::IntraLevel(intra_level) => { - if !intra_level.removed_table_ids.is_empty() { - delete_sst_levels.push(intra_level.level_idx); - delete_sst_ids_set.extend(intra_level.removed_table_ids.iter().clone()); - } - if !intra_level.inserted_table_infos.is_empty() { - insert_sst_level_id = intra_level.level_idx; - insert_sub_level_id = intra_level.l0_sub_level_id; - insert_table_infos.extend(intra_level.inserted_table_infos.iter().cloned()); - } - new_vnode_partition_count = intra_level.vnode_partition_count; - } - GroupDelta::GroupConstruct(construct_delta) => { - assert!(group_construct.is_none()); - group_construct = Some(construct_delta.clone()); - } - GroupDelta::GroupDestroy(_) => { - assert!(group_destroy.is_none()); - group_destroy = Some(compaction_group_id); - } - GroupDelta::GroupMerge(merge_delta) => { - assert!(group_merge.is_none()); - group_merge = Some(*merge_delta); - group_destroy = Some(merge_delta.right_group_id); - } - } - } - - delete_sst_levels.sort(); - delete_sst_levels.dedup(); - - GroupDeltasSummary { - delete_sst_levels, - delete_sst_ids_set, - insert_sst_level_id, - insert_sub_level_id, - insert_table_infos, - group_construct, - group_destroy, - new_vnode_partition_count, - group_merge, - } -} - #[derive(Clone, Default)] pub struct TableGroupInfo { pub group_id: CompactionGroupId, @@ -493,11 +420,12 @@ impl HummockVersion { let mut removed_ssts: BTreeMap> = BTreeMap::new(); // Build only if all deltas are intra level deltas. - if !group_deltas - .group_deltas - .iter() - .all(|delta| matches!(delta, GroupDelta::IntraLevel(_))) - { + if !group_deltas.group_deltas.iter().all(|delta| { + matches!( + delta, + GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_) + ) + }) { continue; } @@ -505,24 +433,36 @@ impl HummockVersion { // current `hummock::manager::gen_version_delta` implementation. Better refactor the // struct to reduce conventions. for group_delta in &group_deltas.group_deltas { - if let GroupDelta::IntraLevel(intra_level) = group_delta { - if !intra_level.inserted_table_infos.is_empty() { - info.insert_sst_level = intra_level.level_idx; - info.insert_sst_infos - .extend(intra_level.inserted_table_infos.iter().cloned()); + match group_delta { + GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => { + if !inserted_table_infos.is_empty() { + info.insert_sst_level = 0; + info.insert_sst_infos + .extend(inserted_table_infos.iter().cloned()); + } } - if !intra_level.removed_table_ids.is_empty() { - for id in &intra_level.removed_table_ids { - if intra_level.level_idx == 0 { - removed_l0_ssts.insert(*id); - } else { - removed_ssts - .entry(intra_level.level_idx) - .or_default() - .insert(*id); + GroupDeltaCommon::IntraLevel(intra_level) => { + if !intra_level.inserted_table_infos.is_empty() { + info.insert_sst_level = intra_level.level_idx; + info.insert_sst_infos + .extend(intra_level.inserted_table_infos.iter().cloned()); + } + if !intra_level.removed_table_ids.is_empty() { + for id in &intra_level.removed_table_ids { + if intra_level.level_idx == 0 { + removed_l0_ssts.insert(*id); + } else { + removed_ssts + .entry(intra_level.level_idx) + .or_default() + .insert(*id); + } } } } + GroupDeltaCommon::GroupConstruct(_) + | GroupDeltaCommon::GroupDestroy(_) + | GroupDeltaCommon::GroupMerge(_) => {} } } @@ -587,97 +527,129 @@ impl HummockVersion { // apply to `levels`, which is different compaction groups for (compaction_group_id, group_deltas) in &version_delta.group_deltas { - let summary = summarize_group_deltas(group_deltas, *compaction_group_id); - if let Some(group_construct) = &summary.group_construct { - let mut new_levels = build_initial_compaction_group_levels( - *compaction_group_id, - group_construct.get_group_config().unwrap(), - ); - let parent_group_id = group_construct.parent_group_id; - new_levels.parent_group_id = parent_group_id; - #[expect(deprecated)] - // for backward-compatibility of previous hummock version delta - new_levels - .member_table_ids - .clone_from(&group_construct.table_ids); - self.levels.insert(*compaction_group_id, new_levels); - let member_table_ids = - if group_construct.version >= CompatibilityVersion::NoMemberTableIds as _ { - self.state_table_info - .compaction_group_member_table_ids(*compaction_group_id) - .iter() - .map(|table_id| table_id.table_id) - .collect() - } else { + let mut is_applied_l0_compact = false; + for group_delta in &group_deltas.group_deltas { + match group_delta { + GroupDeltaCommon::GroupConstruct(group_construct) => { + let mut new_levels = build_initial_compaction_group_levels( + *compaction_group_id, + group_construct.get_group_config().unwrap(), + ); + let parent_group_id = group_construct.parent_group_id; + new_levels.parent_group_id = parent_group_id; #[expect(deprecated)] // for backward-compatibility of previous hummock version delta - BTreeSet::from_iter(group_construct.table_ids.clone()) - }; - - if group_construct.version >= CompatibilityVersion::SplitGroupByTableId as _ { - let split_key = if group_construct.split_key.is_some() { - Some(Bytes::from(group_construct.split_key.clone().unwrap())) - } else { - None - }; - self.init_with_parent_group_v2( - parent_group_id, - *compaction_group_id, - group_construct.get_new_sst_start_id(), - split_key.clone(), - ); - } else { - // for backward-compatibility of previous hummock version delta - self.init_with_parent_group( - parent_group_id, - *compaction_group_id, - member_table_ids, - group_construct.get_new_sst_start_id(), - ); - } - } else if let Some(group_merge) = &summary.group_merge { - tracing::info!( - "group_merge left {:?} right {:?}", - group_merge.left_group_id, - group_merge.right_group_id - ); - self.merge_compaction_group(group_merge.left_group_id, group_merge.right_group_id) - } - let group_destroy = summary.group_destroy; - let levels = self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { - panic!("compaction group {} does not exist", compaction_group_id) - }); + new_levels + .member_table_ids + .clone_from(&group_construct.table_ids); + self.levels.insert(*compaction_group_id, new_levels); + let member_table_ids = if group_construct.version + >= CompatibilityVersion::NoMemberTableIds as _ + { + self.state_table_info + .compaction_group_member_table_ids(*compaction_group_id) + .iter() + .map(|table_id| table_id.table_id) + .collect() + } else { + #[expect(deprecated)] + // for backward-compatibility of previous hummock version delta + BTreeSet::from_iter(group_construct.table_ids.clone()) + }; + + if group_construct.version >= CompatibilityVersion::SplitGroupByTableId as _ + { + let split_key = if group_construct.split_key.is_some() { + Some(Bytes::from(group_construct.split_key.clone().unwrap())) + } else { + None + }; + self.init_with_parent_group_v2( + parent_group_id, + *compaction_group_id, + group_construct.get_new_sst_start_id(), + split_key.clone(), + ); + } else { + // for backward-compatibility of previous hummock version delta + self.init_with_parent_group( + parent_group_id, + *compaction_group_id, + member_table_ids, + group_construct.get_new_sst_start_id(), + ); + } + } + GroupDeltaCommon::GroupMerge(group_merge) => { + tracing::info!( + "group_merge left {:?} right {:?}", + group_merge.left_group_id, + group_merge.right_group_id + ); + self.merge_compaction_group( + group_merge.left_group_id, + group_merge.right_group_id, + ) + } + GroupDeltaCommon::IntraLevel(level_delta) => { + let levels = + self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { + panic!("compaction group {} does not exist", compaction_group_id) + }); + if is_commit_epoch { + assert!( + level_delta.removed_table_ids.is_empty(), + "no sst should be deleted when committing an epoch" + ); - if is_commit_epoch { - let GroupDeltasSummary { - delete_sst_levels, - delete_sst_ids_set, - .. - } = summary; + let IntraLevelDelta { + level_idx, + l0_sub_level_id, + inserted_table_infos, + .. + } = level_delta; + { + assert_eq!( + *level_idx, 0, + "we should only add to L0 when we commit an epoch." + ); + if !inserted_table_infos.is_empty() { + insert_new_sub_level( + &mut levels.l0, + *l0_sub_level_id, + PbLevelType::Overlapping, + inserted_table_infos.clone(), + None, + ); + } + } + } else { + // The delta is caused by compaction. + levels.apply_compact_ssts( + level_delta, + self.state_table_info + .compaction_group_member_table_ids(*compaction_group_id), + ); + if level_delta.level_idx == 0 { + is_applied_l0_compact = true; + } + } + } + GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => { + let levels = + self.levels.get_mut(compaction_group_id).unwrap_or_else(|| { + panic!("compaction group {} does not exist", compaction_group_id) + }); + assert!(is_commit_epoch); - assert!( - delete_sst_levels.is_empty() && delete_sst_ids_set.is_empty() - || group_destroy.is_some(), - "no sst should be deleted when committing an epoch" - ); - let mut next_l0_sub_level_id = levels - .l0 - .sub_levels - .last() - .map(|level| level.sub_level_id + 1) - .unwrap_or(1); - for group_delta in &group_deltas.group_deltas { - if let GroupDelta::IntraLevel(IntraLevelDelta { - level_idx, - inserted_table_infos, - .. - }) = group_delta - { - assert_eq!( - *level_idx, 0, - "we should only add to L0 when we commit an epoch." - ); if !inserted_table_infos.is_empty() { + let next_l0_sub_level_id = levels + .l0 + .sub_levels + .last() + .map(|level| level.sub_level_id + 1) + .unwrap_or(1); + insert_new_sub_level( &mut levels.l0, next_l0_sub_level_id, @@ -685,20 +657,16 @@ impl HummockVersion { inserted_table_infos.clone(), None, ); - next_l0_sub_level_id += 1; } } + GroupDeltaCommon::GroupDestroy(_) => { + self.levels.remove(compaction_group_id); + } } - } else { - // The delta is caused by compaction. - levels.apply_compact_ssts( - summary, - self.state_table_info - .compaction_group_member_table_ids(*compaction_group_id), - ); } - if let Some(destroy_group_id) = &group_destroy { - self.levels.remove(destroy_group_id); + if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id) + { + levels.post_apply_l0_compact(); } } self.id = version_delta.id; @@ -1005,54 +973,53 @@ impl HummockVersionCommon { } } -#[easy_ext::ext(HummockLevelsExt)] impl Levels { - pub fn apply_compact_ssts( + pub(crate) fn apply_compact_ssts( &mut self, - summary: GroupDeltasSummary, + level_delta: &IntraLevelDeltaCommon, member_table_ids: &BTreeSet, ) { - let GroupDeltasSummary { - delete_sst_levels, - delete_sst_ids_set, - insert_sst_level_id, - insert_sub_level_id, - insert_table_infos, - new_vnode_partition_count, + let IntraLevelDeltaCommon { + level_idx, + l0_sub_level_id, + inserted_table_infos: insert_table_infos, + vnode_partition_count, + removed_table_ids: delete_sst_ids_set, .. - } = summary; + } = level_delta; + let new_vnode_partition_count = *vnode_partition_count; - if !self.check_sst_ids_exist(&delete_sst_levels, delete_sst_ids_set.clone()) { + if !self.check_sst_ids_exist(&[*level_idx], delete_sst_ids_set.clone()) { warn!( "This VersionDelta may be committed by an expired compact task. Please check it. \n - delete_sst_levels: {:?}\n, - delete_sst_ids_set: {:?}\n, insert_sst_level_id: {}\n, insert_sub_level_id: {}\n, - insert_table_infos: {:?}\n", - delete_sst_levels, - delete_sst_ids_set, - insert_sst_level_id, - insert_sub_level_id, + insert_table_infos: {:?}\n, + delete_sst_ids_set: {:?}\n", + level_idx, + l0_sub_level_id, insert_table_infos .iter() .map(|sst| (sst.sst_id, sst.object_id)) - .collect_vec() + .collect_vec(), + delete_sst_ids_set, ); return; } - for level_idx in &delete_sst_levels { + if !delete_sst_ids_set.is_empty() { if *level_idx == 0 { for level in &mut self.l0.sub_levels { - level_delete_ssts(level, &delete_sst_ids_set); + level_delete_ssts(level, delete_sst_ids_set); } } else { let idx = *level_idx as usize - 1; - level_delete_ssts(&mut self.levels[idx], &delete_sst_ids_set); + level_delete_ssts(&mut self.levels[idx], delete_sst_ids_set); } } if !insert_table_infos.is_empty() { + let insert_sst_level_id = *level_idx; + let insert_sub_level_id = *l0_sub_level_id; if insert_sst_level_id == 0 { let l0 = &mut self.l0; let index = l0 @@ -1093,7 +1060,10 @@ impl Levels { level_insert_ssts(&mut self.levels[idx], insert_table_infos); } } - if delete_sst_levels.iter().any(|level_id| *level_id == 0) { + } + + pub(crate) fn post_apply_l0_compact(&mut self) { + { self.l0 .sub_levels .retain(|level| !level.table_infos.is_empty()); @@ -1358,7 +1328,7 @@ fn level_delete_ssts( original_len != operand.table_infos.len() } -fn level_insert_ssts(operand: &mut Level, insert_table_infos: Vec) { +fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec) { operand.total_file_size += insert_table_infos .iter() .map(|sst| sst.sst_size) @@ -1367,7 +1337,9 @@ fn level_insert_ssts(operand: &mut Level, insert_table_infos: Vec) .iter() .map(|sst| sst.uncompressed_file_size) .sum::(); - operand.table_infos.extend(insert_table_infos); + operand + .table_infos + .extend(insert_table_infos.iter().cloned()); operand .table_infos .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range)); @@ -1501,7 +1473,7 @@ pub fn validate_version(version: &HummockVersion) -> Vec { #[cfg(test)] mod tests { - use std::collections::HashMap; + use std::collections::{HashMap, HashSet}; use bytes::Bytes; use risingwave_common::catalog::TableId; @@ -1655,7 +1627,7 @@ mod tests { group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new( 1, 0, - vec![], + HashSet::new(), vec![SstableInfo { object_id: 1, sst_id: 1, diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 324e8a91cf4a3..bbc0ae22148c0 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -259,7 +259,7 @@ impl TableWatermarksIndex { } } if self.latest_epoch < committed_epoch { - warn!( + debug!( latest_epoch = self.latest_epoch, committed_epoch, "committed_epoch exceed table watermark latest_epoch" ); diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 64206d9b45b55..b106563cdc7ac 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -21,12 +21,12 @@ use std::sync::{Arc, LazyLock}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::INVALID_EPOCH; -use risingwave_pb::hummock::group_delta::PbDeltaType; +use risingwave_pb::hummock::group_delta::{DeltaType, PbDeltaType}; use risingwave_pb::hummock::hummock_version_delta::PbGroupDeltas; use risingwave_pb::hummock::{ CompactionConfig, PbGroupConstruct, PbGroupDelta, PbGroupDestroy, PbGroupMerge, - PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, PbSstableInfo, PbStateTableInfo, - StateTableInfo, StateTableInfoDelta, + PbHummockVersion, PbHummockVersionDelta, PbIntraLevelDelta, PbNewL0SubLevel, PbSstableInfo, + PbStateTableInfo, StateTableInfo, StateTableInfoDelta, }; use tracing::warn; @@ -512,76 +512,45 @@ impl HummockVersionDelta { /// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`, /// but it is possible that the object is moved or split from other compaction groups or levels. pub fn newly_added_object_ids(&self) -> HashSet { - self.group_deltas - .values() - .flat_map(|group_deltas| { - group_deltas.group_deltas.iter().flat_map(|group_delta| { - static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { - &level_delta.inserted_table_infos - } else { - &EMPTY_VEC - }; - sst_slice.iter().map(|sst| sst.object_id) - }) - }) - .chain(self.change_log_delta.values().flat_map(|delta| { - let new_log = delta.new_log.as_ref().unwrap(); - new_log - .new_value - .iter() - .map(|sst| sst.object_id) - .chain(new_log.old_value.iter().map(|sst| sst.object_id)) - })) + self.newly_added_sst_infos(None) + .map(|sst| sst.object_id) .collect() } pub fn newly_added_sst_ids(&self) -> HashSet { - let ssts_from_group_deltas = self.group_deltas.values().flat_map(|group_deltas| { - group_deltas.group_deltas.iter().flat_map(|group_delta| { - static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { - &level_delta.inserted_table_infos - } else { - &EMPTY_VEC - }; - sst_slice.iter() - }) - }); - - let ssts_from_change_log = self.change_log_delta.values().flat_map(|delta| { - let new_log = delta.new_log.as_ref().unwrap(); - new_log.new_value.iter().chain(new_log.old_value.iter()) - }); - - ssts_from_group_deltas - .chain(ssts_from_change_log) + self.newly_added_sst_infos(None) .map(|sst| sst.sst_id) .collect() } pub fn newly_added_sst_infos<'a>( &'a self, - select_group: &'a HashSet, + select_group: Option<&'a HashSet>, ) -> impl Iterator + 'a { self.group_deltas .iter() - .filter_map(|(cg_id, group_deltas)| { - if select_group.contains(cg_id) { - Some(group_deltas) - } else { + .filter_map(move |(cg_id, group_deltas)| { + if let Some(select_group) = select_group + && !select_group.contains(cg_id) + { None + } else { + Some(group_deltas) } }) .flat_map(|group_deltas| { group_deltas.group_deltas.iter().flat_map(|group_delta| { - static EMPTY_VEC: Vec = Vec::new(); - let sst_slice = if let GroupDelta::IntraLevel(level_delta) = &group_delta { - &level_delta.inserted_table_infos - } else { - &EMPTY_VEC + let sst_slice = match &group_delta { + GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) + | GroupDeltaCommon::IntraLevel(IntraLevelDeltaCommon { + inserted_table_infos, + .. + }) => Some(inserted_table_infos.iter()), + GroupDeltaCommon::GroupConstruct(_) + | GroupDeltaCommon::GroupDestroy(_) + | GroupDeltaCommon::GroupMerge(_) => None, }; - sst_slice.iter() + sst_slice.into_iter().flatten() }) }) .chain(self.change_log_delta.values().flat_map(|delta| { @@ -785,7 +754,7 @@ where pub struct IntraLevelDeltaCommon { pub level_idx: u32, pub l0_sub_level_id: u64, - pub removed_table_ids: Vec, + pub removed_table_ids: HashSet, pub inserted_table_infos: Vec, pub vnode_partition_count: u32, } @@ -814,7 +783,7 @@ where Self { level_idx: pb_intra_level_delta.level_idx, l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id, - removed_table_ids: pb_intra_level_delta.removed_table_ids, + removed_table_ids: HashSet::from_iter(pb_intra_level_delta.removed_table_ids), inserted_table_infos: pb_intra_level_delta .inserted_table_infos .into_iter() @@ -833,7 +802,7 @@ where Self { level_idx: intra_level_delta.level_idx, l0_sub_level_id: intra_level_delta.l0_sub_level_id, - removed_table_ids: intra_level_delta.removed_table_ids, + removed_table_ids: intra_level_delta.removed_table_ids.into_iter().collect(), inserted_table_infos: intra_level_delta .inserted_table_infos .into_iter() @@ -852,7 +821,11 @@ where Self { level_idx: intra_level_delta.level_idx, l0_sub_level_id: intra_level_delta.l0_sub_level_id, - removed_table_ids: intra_level_delta.removed_table_ids.clone(), + removed_table_ids: intra_level_delta + .removed_table_ids + .iter() + .cloned() + .collect(), inserted_table_infos: intra_level_delta .inserted_table_infos .iter() @@ -871,7 +844,9 @@ where Self { level_idx: pb_intra_level_delta.level_idx, l0_sub_level_id: pb_intra_level_delta.l0_sub_level_id, - removed_table_ids: pb_intra_level_delta.removed_table_ids.clone(), + removed_table_ids: HashSet::from_iter( + pb_intra_level_delta.removed_table_ids.iter().cloned(), + ), inserted_table_infos: pb_intra_level_delta .inserted_table_infos .iter() @@ -886,7 +861,7 @@ impl IntraLevelDelta { pub fn new( level_idx: u32, l0_sub_level_id: u64, - removed_table_ids: Vec, + removed_table_ids: HashSet, inserted_table_infos: Vec, vnode_partition_count: u32, ) -> Self { @@ -902,6 +877,7 @@ impl IntraLevelDelta { #[derive(Debug, PartialEq, Clone)] pub enum GroupDeltaCommon { + NewL0SubLevel(Vec), IntraLevel(IntraLevelDeltaCommon), GroupConstruct(PbGroupConstruct), GroupDestroy(PbGroupDestroy), @@ -928,6 +904,13 @@ where Some(PbDeltaType::GroupMerge(pb_group_merge)) => { GroupDeltaCommon::GroupMerge(pb_group_merge) } + Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel( + pb_new_sub_level + .inserted_table_infos + .into_iter() + .map(T::from) + .collect(), + ), None => panic!("delta_type is not set"), } } @@ -951,6 +934,14 @@ where GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMerge(pb_group_merge)), }, + GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta { + delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel { + inserted_table_infos: new_sub_level + .into_iter() + .map(PbSstableInfo::from) + .collect(), + })), + }, } } } @@ -973,6 +964,11 @@ where GroupDeltaCommon::GroupMerge(pb_group_merge) => PbGroupDelta { delta_type: Some(PbDeltaType::GroupMerge(*pb_group_merge)), }, + GroupDeltaCommon::NewL0SubLevel(new_sub_level) => PbGroupDelta { + delta_type: Some(PbDeltaType::NewL0SubLevel(PbNewL0SubLevel { + inserted_table_infos: new_sub_level.iter().map(PbSstableInfo::from).collect(), + })), + }, } } } @@ -995,6 +991,13 @@ where Some(PbDeltaType::GroupMerge(pb_group_merge)) => { GroupDeltaCommon::GroupMerge(*pb_group_merge) } + Some(DeltaType::NewL0SubLevel(pb_new_sub_level)) => GroupDeltaCommon::NewL0SubLevel( + pb_new_sub_level + .inserted_table_infos + .iter() + .map(T::from) + .collect(), + ), None => panic!("delta_type is not set"), } }