Skip to content

Commit

Permalink
refactor(meta): use correct type for the pb counterpart (#19500)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Nov 21, 2024
1 parent ea3e909 commit bae490a
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 40 deletions.
13 changes: 9 additions & 4 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl HummockManager {
.order_by_desc(hummock_time_travel_version::Column::VersionId)
.one(&sql_store.conn)
.await?
.map(|v| HummockVersion::from_persisted_protobuf(&v.version.to_protobuf()))
.map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf()))
else {
return Ok(());
};
Expand Down Expand Up @@ -102,7 +102,7 @@ impl HummockManager {
.order_by_desc(hummock_time_travel_version::Column::VersionId)
.one(&txn)
.await?
.map(|m| HummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
.map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
let Some(latest_valid_version) = latest_valid_version else {
txn.commit().await?;
return Ok(());
Expand Down Expand Up @@ -152,7 +152,7 @@ impl HummockManager {
delta_id_to_delete
)))
})?;
let delta_to_delete = HummockVersionDelta::from_persisted_protobuf(
let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
&delta_to_delete.version_delta.to_protobuf(),
);
let new_sst_ids = delta_to_delete.newly_added_sst_ids();
Expand Down Expand Up @@ -182,7 +182,9 @@ impl HummockManager {
prev_version_id
)))
})?;
HummockVersion::from_persisted_protobuf(&prev_version.version.to_protobuf())
IncompleteHummockVersion::from_persisted_protobuf(
&prev_version.version.to_protobuf(),
)
};
let sst_ids = prev_version.get_sst_ids();
// The SST ids deleted by compaction between the 2 versions.
Expand Down Expand Up @@ -488,10 +490,13 @@ impl HummockManager {
}
}

/// The `HummockVersion` is actually `InHummockVersion`. It requires `refill_version`.
fn replay_archive(
version: PbHummockVersion,
deltas: impl Iterator<Item = PbHummockVersionDelta>,
) -> HummockVersion {
// The pb version ann pb version delta are actually written by InHummockVersion and InHummockVersionDelta, respectively.
// Using HummockVersion make it easier for `refill_version` later.
let mut last_version = HummockVersion::from_persisted_protobuf(&version);
for d in deltas {
let d = HummockVersionDelta::from_persisted_protobuf(&d);
Expand Down
66 changes: 35 additions & 31 deletions src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ use super::{group_split, StateTableId};
use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
use crate::compaction_group::StaticCompactionGroupId;
use crate::key_range::KeyRangeCommon;
use crate::level::{Level, Levels, OverlappingLevel};
use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
use crate::sstable_info::SstableInfo;
use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
use crate::version::{
GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDelta,
HummockVersionStateTableInfo, IntraLevelDelta, IntraLevelDeltaCommon,
HummockVersionStateTableInfo, IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader,
SstableIdReader,
};
use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId};
#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -64,33 +65,6 @@ impl HummockVersion {
.unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
}

pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ Level> + '_ {
self.levels
.values()
.flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
}

pub fn get_object_ids(&self) -> HashSet<HummockSstableObjectId> {
self.get_sst_infos().map(|s| s.object_id).collect()
}

pub fn get_sst_ids(&self) -> HashSet<HummockSstableObjectId> {
self.get_sst_infos().map(|s| s.sst_id).collect()
}

pub fn get_sst_infos(&self) -> impl Iterator<Item = &SstableInfo> {
self.get_combined_levels()
.flat_map(|level| level.table_infos.iter())
.chain(self.table_change_log.values().flat_map(|change_log| {
change_log.0.iter().flat_map(|epoch_change_log| {
epoch_change_log
.old_value
.iter()
.chain(epoch_change_log.new_value.iter())
})
}))
}

// only scan the sst infos from levels in the specified compaction group (without table change log)
pub fn get_sst_ids_by_group_id(
&self,
Expand Down Expand Up @@ -859,9 +833,7 @@ impl HummockVersion {

group_split::merge_levels(left_levels, right_levels);
}
}

impl HummockVersionCommon<SstableInfo> {
pub fn init_with_parent_group_v2(
&mut self,
parent_group_id: CompactionGroupId,
Expand Down Expand Up @@ -993,6 +965,38 @@ impl HummockVersionCommon<SstableInfo> {
}
}

impl<T> HummockVersionCommon<T>
where
T: SstableIdReader + ObjectIdReader,
{
pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
self.levels
.values()
.flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
}

pub fn get_object_ids(&self) -> HashSet<HummockSstableObjectId> {
self.get_sst_infos().map(|s| s.object_id()).collect()
}

pub fn get_sst_ids(&self) -> HashSet<HummockSstableObjectId> {
self.get_sst_infos().map(|s| s.sst_id()).collect()
}

pub fn get_sst_infos(&self) -> impl Iterator<Item = &T> {
self.get_combined_levels()
.flat_map(|level| level.table_infos.iter())
.chain(self.table_change_log.values().flat_map(|change_log| {
change_log.0.iter().flat_map(|epoch_change_log| {
epoch_change_log
.old_value
.iter()
.chain(epoch_change_log.new_value.iter())
})
}))
}
}

impl Levels {
pub(crate) fn apply_compact_ssts(
&mut self,
Expand Down
14 changes: 14 additions & 0 deletions src/storage/hummock_sdk/src/sstable_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::mem::size_of;
use risingwave_pb::hummock::{PbBloomFilterType, PbKeyRange, PbSstableInfo};

use crate::key_range::KeyRange;
use crate::version::{ObjectIdReader, SstableIdReader};
use crate::{HummockSstableId, HummockSstableObjectId};

#[derive(Debug, PartialEq, Clone, Default)]
pub struct SstableInfo {
Expand Down Expand Up @@ -216,3 +218,15 @@ impl SstableInfo {
self.key_range = KeyRange::default();
}
}

impl SstableIdReader for SstableInfo {
fn sst_id(&self) -> HummockSstableId {
self.sst_id
}
}

impl ObjectIdReader for SstableInfo {
fn object_id(&self) -> HummockSstableObjectId {
self.object_id
}
}
13 changes: 13 additions & 0 deletions src/storage/hummock_sdk/src/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::level::Level;
use crate::sstable_info::SstableInfo;
use crate::version::{
HummockVersion, HummockVersionCommon, HummockVersionDelta, HummockVersionDeltaCommon,
ObjectIdReader, SstableIdReader,
};
use crate::{CompactionGroupId, HummockSstableId, HummockSstableObjectId};

Expand Down Expand Up @@ -172,6 +173,18 @@ pub struct SstableIdInVersion {
object_id: HummockSstableObjectId,
}

impl SstableIdReader for SstableIdInVersion {
fn sst_id(&self) -> HummockSstableId {
self.sst_id
}
}

impl ObjectIdReader for SstableIdInVersion {
fn object_id(&self) -> HummockSstableObjectId {
self.object_id
}
}

impl From<&SstableIdInVersion> for PbSstableInfo {
fn from(sst_id: &SstableIdInVersion) -> Self {
Self {
Expand Down
24 changes: 19 additions & 5 deletions src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ use crate::level::LevelsCommon;
use crate::sstable_info::SstableInfo;
use crate::table_watermark::TableWatermarks;
use crate::{
CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, FIRST_VERSION_ID,
CompactionGroupId, HummockEpoch, HummockSstableId, HummockSstableObjectId, HummockVersionId,
FIRST_VERSION_ID,
};

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -506,27 +507,38 @@ where
}
}

impl HummockVersionDelta {
pub trait SstableIdReader {
fn sst_id(&self) -> HummockSstableId;
}

pub trait ObjectIdReader {
fn object_id(&self) -> HummockSstableObjectId;
}

impl<T> HummockVersionDeltaCommon<T>
where
T: SstableIdReader + ObjectIdReader,
{
/// Get the newly added object ids from the version delta.
///
/// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`,
/// but it is possible that the object is moved or split from other compaction groups or levels.
pub fn newly_added_object_ids(&self) -> HashSet<HummockSstableObjectId> {
self.newly_added_sst_infos(None)
.map(|sst| sst.object_id)
.map(|sst| sst.object_id())
.collect()
}

pub fn newly_added_sst_ids(&self) -> HashSet<HummockSstableObjectId> {
self.newly_added_sst_infos(None)
.map(|sst| sst.sst_id)
.map(|sst| sst.sst_id())
.collect()
}

pub fn newly_added_sst_infos<'a>(
&'a self,
select_group: Option<&'a HashSet<CompactionGroupId>>,
) -> impl Iterator<Item = &'a SstableInfo> + 'a {
) -> impl Iterator<Item = &'a T> + 'a {
self.group_deltas
.iter()
.filter_map(move |(cg_id, group_deltas)| {
Expand Down Expand Up @@ -559,7 +571,9 @@ impl HummockVersionDelta {
new_log.new_value.iter().chain(new_log.old_value.iter())
}))
}
}

impl HummockVersionDelta {
#[expect(deprecated)]
pub fn max_committed_epoch_for_migration(&self) -> HummockEpoch {
self.max_committed_epoch
Expand Down

0 comments on commit bae490a

Please sign in to comment.