Skip to content

Commit

Permalink
refactor(storage): make HummockVersion a generic struct (#18478)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Sep 12, 2024
1 parent a9b1de3 commit 4afcd4d
Show file tree
Hide file tree
Showing 5 changed files with 488 additions and 476 deletions.
111 changes: 71 additions & 40 deletions src/storage/hummock_sdk/src/change_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,42 @@ use std::collections::HashMap;

use risingwave_common::catalog::TableId;
use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
use risingwave_pb::hummock::{PbEpochNewChangeLog, PbTableChangeLog};
use risingwave_pb::hummock::{PbEpochNewChangeLog, PbSstableInfo, PbTableChangeLog};
use tracing::warn;

use crate::sstable_info::SstableInfo;

#[derive(Debug, Clone, PartialEq)]
pub struct TableChangeLog(pub Vec<EpochNewChangeLog>);
pub struct TableChangeLogCommon<T>(pub Vec<EpochNewChangeLogCommon<T>>);

pub type TableChangeLog = TableChangeLogCommon<SstableInfo>;

#[derive(Debug, Clone, PartialEq)]
pub struct EpochNewChangeLog {
pub new_value: Vec<SstableInfo>,
pub old_value: Vec<SstableInfo>,
pub struct EpochNewChangeLogCommon<T> {
pub new_value: Vec<T>,
pub old_value: Vec<T>,
pub epochs: Vec<u64>,
}

impl From<&EpochNewChangeLog> for PbEpochNewChangeLog {
fn from(val: &EpochNewChangeLog) -> Self {
pub type EpochNewChangeLog = EpochNewChangeLogCommon<SstableInfo>;

impl<T> From<&EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
where
PbSstableInfo: for<'a> From<&'a T>,
{
fn from(val: &EpochNewChangeLogCommon<T>) -> Self {
Self {
new_value: val.new_value.iter().map(|a| a.clone().into()).collect(),
old_value: val.old_value.iter().map(|a| a.clone().into()).collect(),
new_value: val.new_value.iter().map(|a| a.into()).collect(),
old_value: val.old_value.iter().map(|a| a.into()).collect(),
epochs: val.epochs.clone(),
}
}
}

impl From<&PbEpochNewChangeLog> for EpochNewChangeLog {
impl<T> From<&PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
where
T: for<'a> From<&'a PbSstableInfo>,
{
fn from(value: &PbEpochNewChangeLog) -> Self {
Self {
new_value: value.new_value.iter().map(|a| a.into()).collect(),
Expand All @@ -51,30 +61,28 @@ impl From<&PbEpochNewChangeLog> for EpochNewChangeLog {
}
}

impl From<EpochNewChangeLog> for PbEpochNewChangeLog {
fn from(val: EpochNewChangeLog) -> Self {
impl<T> From<EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
where
PbSstableInfo: From<T>,
{
fn from(val: EpochNewChangeLogCommon<T>) -> Self {
Self {
new_value: val
.new_value
.into_iter()
.map(|a| a.clone().into())
.collect(),
old_value: val
.old_value
.into_iter()
.map(|a| a.clone().into())
.collect(),
epochs: val.epochs.clone(),
new_value: val.new_value.into_iter().map(|a| a.into()).collect(),
old_value: val.old_value.into_iter().map(|a| a.into()).collect(),
epochs: val.epochs,
}
}
}

impl From<PbEpochNewChangeLog> for EpochNewChangeLog {
impl<T> From<PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
where
T: From<PbSstableInfo>,
{
fn from(value: PbEpochNewChangeLog) -> Self {
Self {
new_value: value.new_value.into_iter().map(|a| a.into()).collect(),
old_value: value.old_value.into_iter().map(|a| a.into()).collect(),
epochs: value.epochs.clone(),
epochs: value.epochs,
}
}
}
Expand Down Expand Up @@ -117,15 +125,23 @@ impl TableChangeLog {
}
}

impl TableChangeLog {
impl<T> TableChangeLogCommon<T>
where
PbSstableInfo: for<'a> From<&'a T>,
{
pub fn to_protobuf(&self) -> PbTableChangeLog {
PbTableChangeLog {
change_logs: self.0.iter().map(|a| a.into()).collect(),
}
}
}

impl<T> TableChangeLogCommon<T>
where
T: for<'a> From<&'a PbSstableInfo>,
{
pub fn from_protobuf(val: &PbTableChangeLog) -> Self {
Self(val.change_logs.clone().iter().map(|a| a.into()).collect())
Self(val.change_logs.iter().map(|a| a.into()).collect())
}
}

Expand Down Expand Up @@ -173,21 +189,29 @@ pub fn build_table_change_log_delta<'a>(
}

#[derive(Debug, PartialEq, Clone)]
pub struct ChangeLogDelta {
pub struct ChangeLogDeltaCommon<T> {
pub truncate_epoch: u64,
pub new_log: Option<EpochNewChangeLog>,
pub new_log: Option<EpochNewChangeLogCommon<T>>,
}

impl From<&ChangeLogDelta> for PbChangeLogDelta {
fn from(val: &ChangeLogDelta) -> Self {
pub type ChangeLogDelta = ChangeLogDeltaCommon<SstableInfo>;

impl<T> From<&ChangeLogDeltaCommon<T>> for PbChangeLogDelta
where
PbSstableInfo: for<'a> From<&'a T>,
{
fn from(val: &ChangeLogDeltaCommon<T>) -> Self {
Self {
truncate_epoch: val.truncate_epoch,
new_log: val.new_log.as_ref().map(|a| a.into()),
}
}
}

impl From<&PbChangeLogDelta> for ChangeLogDelta {
impl<T> From<&PbChangeLogDelta> for ChangeLogDeltaCommon<T>
where
T: for<'a> From<&'a PbSstableInfo>,
{
fn from(val: &PbChangeLogDelta) -> Self {
Self {
truncate_epoch: val.truncate_epoch,
Expand All @@ -196,16 +220,22 @@ impl From<&PbChangeLogDelta> for ChangeLogDelta {
}
}

impl From<ChangeLogDelta> for PbChangeLogDelta {
fn from(val: ChangeLogDelta) -> Self {
impl<T> From<ChangeLogDeltaCommon<T>> for PbChangeLogDelta
where
PbSstableInfo: From<T>,
{
fn from(val: ChangeLogDeltaCommon<T>) -> Self {
Self {
truncate_epoch: val.truncate_epoch,
new_log: val.new_log.map(|a| a.into()),
}
}
}

impl From<PbChangeLogDelta> for ChangeLogDelta {
impl<T> From<PbChangeLogDelta> for ChangeLogDeltaCommon<T>
where
T: From<PbSstableInfo>,
{
fn from(val: PbChangeLogDelta) -> Self {
Self {
truncate_epoch: val.truncate_epoch,
Expand All @@ -218,11 +248,12 @@ impl From<PbChangeLogDelta> for ChangeLogDelta {
mod tests {
use itertools::Itertools;

use crate::change_log::{EpochNewChangeLog, TableChangeLog};
use crate::change_log::{EpochNewChangeLog, TableChangeLogCommon};
use crate::sstable_info::SstableInfo;

#[test]
fn test_filter_epoch() {
let table_change_log = TableChangeLog(vec![
let table_change_log = TableChangeLogCommon::<SstableInfo>(vec![
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
Expand Down Expand Up @@ -262,7 +293,7 @@ mod tests {

#[test]
fn test_truncate() {
let mut table_change_log = TableChangeLog(vec![
let mut table_change_log = TableChangeLogCommon::<SstableInfo>(vec![
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
Expand All @@ -288,7 +319,7 @@ mod tests {
table_change_log.truncate(1);
assert_eq!(
table_change_log,
TableChangeLog(vec![
TableChangeLogCommon::<SstableInfo>(vec![
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
Expand All @@ -310,7 +341,7 @@ mod tests {
table_change_log.truncate(3);
assert_eq!(
table_change_log,
TableChangeLog(vec![
TableChangeLogCommon::<SstableInfo>(vec![
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
Expand Down
Loading

0 comments on commit 4afcd4d

Please sign in to comment.