From bae490aab2ae895b4a76d1d2a7c5b0cc63169e99 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Thu, 21 Nov 2024 13:50:13 +0800 Subject: [PATCH] refactor(meta): use correct type for the pb counterpart (#19500) --- src/meta/src/hummock/manager/time_travel.rs | 13 ++-- .../compaction_group/hummock_version_ext.rs | 66 ++++++++++--------- src/storage/hummock_sdk/src/sstable_info.rs | 14 ++++ src/storage/hummock_sdk/src/time_travel.rs | 13 ++++ src/storage/hummock_sdk/src/version.rs | 24 +++++-- 5 files changed, 90 insertions(+), 40 deletions(-) diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index 07b8e2f1b072c..9391169f00a23 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -53,7 +53,7 @@ impl HummockManager { .order_by_desc(hummock_time_travel_version::Column::VersionId) .one(&sql_store.conn) .await? - .map(|v| HummockVersion::from_persisted_protobuf(&v.version.to_protobuf())) + .map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf())) else { return Ok(()); }; @@ -102,7 +102,7 @@ impl HummockManager { .order_by_desc(hummock_time_travel_version::Column::VersionId) .one(&txn) .await? - .map(|m| HummockVersion::from_persisted_protobuf(&m.version.to_protobuf())); + .map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf())); let Some(latest_valid_version) = latest_valid_version else { txn.commit().await?; return Ok(()); @@ -152,7 +152,7 @@ impl HummockManager { delta_id_to_delete ))) })?; - let delta_to_delete = HummockVersionDelta::from_persisted_protobuf( + let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf( &delta_to_delete.version_delta.to_protobuf(), ); let new_sst_ids = delta_to_delete.newly_added_sst_ids(); @@ -182,7 +182,9 @@ impl HummockManager { prev_version_id ))) })?; - HummockVersion::from_persisted_protobuf(&prev_version.version.to_protobuf()) + IncompleteHummockVersion::from_persisted_protobuf( + &prev_version.version.to_protobuf(), + ) }; let sst_ids = prev_version.get_sst_ids(); // The SST ids deleted by compaction between the 2 versions. @@ -488,10 +490,13 @@ impl HummockManager { } } +/// The `HummockVersion` is actually `InHummockVersion`. It requires `refill_version`. fn replay_archive( version: PbHummockVersion, deltas: impl Iterator, ) -> HummockVersion { + // The pb version ann pb version delta are actually written by InHummockVersion and InHummockVersionDelta, respectively. + // Using HummockVersion make it easier for `refill_version` later. let mut last_version = HummockVersion::from_persisted_protobuf(&version); for d in deltas { let d = HummockVersionDelta::from_persisted_protobuf(&d); diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 7a1f0851d3d4d..ac9521b0f5272 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -31,12 +31,13 @@ use super::{group_split, StateTableId}; use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon}; use crate::compaction_group::StaticCompactionGroupId; use crate::key_range::KeyRangeCommon; -use crate::level::{Level, Levels, OverlappingLevel}; +use crate::level::{Level, LevelCommon, Levels, OverlappingLevel}; use crate::sstable_info::SstableInfo; use crate::table_watermark::{ReadTableWatermark, TableWatermarks}; use crate::version::{ GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDelta, - HummockVersionStateTableInfo, IntraLevelDelta, IntraLevelDeltaCommon, + HummockVersionStateTableInfo, IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, + SstableIdReader, }; use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId}; #[derive(Debug, Clone, Default)] @@ -64,33 +65,6 @@ impl HummockVersion { .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id)) } - pub fn get_combined_levels(&self) -> impl Iterator + '_ { - self.levels - .values() - .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter())) - } - - pub fn get_object_ids(&self) -> HashSet { - self.get_sst_infos().map(|s| s.object_id).collect() - } - - pub fn get_sst_ids(&self) -> HashSet { - self.get_sst_infos().map(|s| s.sst_id).collect() - } - - pub fn get_sst_infos(&self) -> impl Iterator { - self.get_combined_levels() - .flat_map(|level| level.table_infos.iter()) - .chain(self.table_change_log.values().flat_map(|change_log| { - change_log.0.iter().flat_map(|epoch_change_log| { - epoch_change_log - .old_value - .iter() - .chain(epoch_change_log.new_value.iter()) - }) - })) - } - // only scan the sst infos from levels in the specified compaction group (without table change log) pub fn get_sst_ids_by_group_id( &self, @@ -859,9 +833,7 @@ impl HummockVersion { group_split::merge_levels(left_levels, right_levels); } -} -impl HummockVersionCommon { pub fn init_with_parent_group_v2( &mut self, parent_group_id: CompactionGroupId, @@ -993,6 +965,38 @@ impl HummockVersionCommon { } } +impl HummockVersionCommon +where + T: SstableIdReader + ObjectIdReader, +{ + pub fn get_combined_levels(&self) -> impl Iterator> + '_ { + self.levels + .values() + .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter())) + } + + pub fn get_object_ids(&self) -> HashSet { + self.get_sst_infos().map(|s| s.object_id()).collect() + } + + pub fn get_sst_ids(&self) -> HashSet { + self.get_sst_infos().map(|s| s.sst_id()).collect() + } + + pub fn get_sst_infos(&self) -> impl Iterator { + self.get_combined_levels() + .flat_map(|level| level.table_infos.iter()) + .chain(self.table_change_log.values().flat_map(|change_log| { + change_log.0.iter().flat_map(|epoch_change_log| { + epoch_change_log + .old_value + .iter() + .chain(epoch_change_log.new_value.iter()) + }) + })) + } +} + impl Levels { pub(crate) fn apply_compact_ssts( &mut self, diff --git a/src/storage/hummock_sdk/src/sstable_info.rs b/src/storage/hummock_sdk/src/sstable_info.rs index 22a3cd8f31fc5..10afe52ab9ad9 100644 --- a/src/storage/hummock_sdk/src/sstable_info.rs +++ b/src/storage/hummock_sdk/src/sstable_info.rs @@ -17,6 +17,8 @@ use std::mem::size_of; use risingwave_pb::hummock::{PbBloomFilterType, PbKeyRange, PbSstableInfo}; use crate::key_range::KeyRange; +use crate::version::{ObjectIdReader, SstableIdReader}; +use crate::{HummockSstableId, HummockSstableObjectId}; #[derive(Debug, PartialEq, Clone, Default)] pub struct SstableInfo { @@ -216,3 +218,15 @@ impl SstableInfo { self.key_range = KeyRange::default(); } } + +impl SstableIdReader for SstableInfo { + fn sst_id(&self) -> HummockSstableId { + self.sst_id + } +} + +impl ObjectIdReader for SstableInfo { + fn object_id(&self) -> HummockSstableObjectId { + self.object_id + } +} diff --git a/src/storage/hummock_sdk/src/time_travel.rs b/src/storage/hummock_sdk/src/time_travel.rs index a767b5caa27d7..4f2508a5772b7 100644 --- a/src/storage/hummock_sdk/src/time_travel.rs +++ b/src/storage/hummock_sdk/src/time_travel.rs @@ -23,6 +23,7 @@ use crate::level::Level; use crate::sstable_info::SstableInfo; use crate::version::{ HummockVersion, HummockVersionCommon, HummockVersionDelta, HummockVersionDeltaCommon, + ObjectIdReader, SstableIdReader, }; use crate::{CompactionGroupId, HummockSstableId, HummockSstableObjectId}; @@ -172,6 +173,18 @@ pub struct SstableIdInVersion { object_id: HummockSstableObjectId, } +impl SstableIdReader for SstableIdInVersion { + fn sst_id(&self) -> HummockSstableId { + self.sst_id + } +} + +impl ObjectIdReader for SstableIdInVersion { + fn object_id(&self) -> HummockSstableObjectId { + self.object_id + } +} + impl From<&SstableIdInVersion> for PbSstableInfo { fn from(sst_id: &SstableIdInVersion) -> Self { Self { diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 80276b09ffdff..6266ee84474b3 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -37,7 +37,8 @@ use crate::level::LevelsCommon; use crate::sstable_info::SstableInfo; use crate::table_watermark::TableWatermarks; use crate::{ - CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, FIRST_VERSION_ID, + CompactionGroupId, HummockEpoch, HummockSstableId, HummockSstableObjectId, HummockVersionId, + FIRST_VERSION_ID, }; #[derive(Debug, Clone, PartialEq)] @@ -506,27 +507,38 @@ where } } -impl HummockVersionDelta { +pub trait SstableIdReader { + fn sst_id(&self) -> HummockSstableId; +} + +pub trait ObjectIdReader { + fn object_id(&self) -> HummockSstableObjectId; +} + +impl HummockVersionDeltaCommon +where + T: SstableIdReader + ObjectIdReader, +{ /// Get the newly added object ids from the version delta. /// /// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`, /// but it is possible that the object is moved or split from other compaction groups or levels. pub fn newly_added_object_ids(&self) -> HashSet { self.newly_added_sst_infos(None) - .map(|sst| sst.object_id) + .map(|sst| sst.object_id()) .collect() } pub fn newly_added_sst_ids(&self) -> HashSet { self.newly_added_sst_infos(None) - .map(|sst| sst.sst_id) + .map(|sst| sst.sst_id()) .collect() } pub fn newly_added_sst_infos<'a>( &'a self, select_group: Option<&'a HashSet>, - ) -> impl Iterator + 'a { + ) -> impl Iterator + 'a { self.group_deltas .iter() .filter_map(move |(cg_id, group_deltas)| { @@ -559,7 +571,9 @@ impl HummockVersionDelta { new_log.new_value.iter().chain(new_log.old_value.iter()) })) } +} +impl HummockVersionDelta { #[expect(deprecated)] pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch { self.max_committed_epoch