Skip to content

Commit

Permalink
feat(storage): implement iter_log for hummock (#15666)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Apr 18, 2024
1 parent 3daa160 commit 27e1b0e
Show file tree
Hide file tree
Showing 12 changed files with 950 additions and 19 deletions.
13 changes: 13 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ message TableWatermarks {
bool is_ascending = 2;
}

message EpochNewChangeLog {
repeated SstableInfo old_value = 1;
repeated SstableInfo new_value = 2;
// Epochs should be sorted in ascending order, which means earlier epoch at the front
repeated uint64 epochs = 3;
}

message TableChangeLog {
// Epochs should be sorted in ascending order, which means earlier epoch at the front.
repeated EpochNewChangeLog change_logs = 1;
}

message HummockVersion {
message Levels {
repeated Level levels = 1;
Expand All @@ -153,6 +165,7 @@ message HummockVersion {
// Reads against such an epoch will fail.
uint64 safe_epoch = 4;
map<uint32, TableWatermarks> table_watermarks = 5;
map<uint32, TableChangeLog> table_change_logs = 6;
}

message HummockVersionDelta {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ pub(super) fn create_init_version(default_compaction_config: CompactionConfig) -
max_committed_epoch: INVALID_EPOCH,
safe_epoch: INVALID_EPOCH,
table_watermarks: HashMap::new(),
table_change_log: HashMap::new(),
};
for group_id in [
StaticCompactionGroupId::StateDefault as CompactionGroupId,
Expand Down Expand Up @@ -569,6 +570,7 @@ mod tests {
max_committed_epoch: 0,
safe_epoch: 0,
table_watermarks: HashMap::new(),
table_change_log: HashMap::new(),
};
for cg in 1..3 {
version.levels.insert(
Expand Down
113 changes: 113 additions & 0 deletions src/storage/hummock_sdk/src/change_log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::hummock::{PbEpochNewChangeLog, PbTableChangeLog, SstableInfo};

#[derive(Debug, Clone, PartialEq)]
pub struct EpochNewChangeLog {
pub new_value: Vec<SstableInfo>,
pub old_value: Vec<SstableInfo>,
pub epochs: Vec<u64>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct TableChangeLog(pub Vec<EpochNewChangeLog>);

impl TableChangeLog {
pub fn filter_epoch(&self, (min_epoch, max_epoch): (u64, u64)) -> &[EpochNewChangeLog] {
let start = self.0.partition_point(|epoch_change_log| {
epoch_change_log.epochs.last().expect("non-empty") < &min_epoch
});
let end = self.0.partition_point(|epoch_change_log| {
epoch_change_log.epochs.first().expect("non-empty") <= &max_epoch
});
&self.0[start..end]
}
}

impl TableChangeLog {
pub fn to_protobuf(&self) -> PbTableChangeLog {
PbTableChangeLog {
change_logs: self
.0
.iter()
.map(|epoch_new_log| PbEpochNewChangeLog {
epochs: epoch_new_log.epochs.clone(),
new_value: epoch_new_log.new_value.clone(),
old_value: epoch_new_log.old_value.clone(),
})
.collect(),
}
}

pub fn from_protobuf(val: &PbTableChangeLog) -> Self {
Self(
val.change_logs
.iter()
.map(|epoch_new_log| EpochNewChangeLog {
epochs: epoch_new_log.epochs.clone(),
new_value: epoch_new_log.new_value.clone(),
old_value: epoch_new_log.old_value.clone(),
})
.collect(),
)
}
}

#[cfg(test)]
mod tests {
use itertools::Itertools;

use crate::change_log::{EpochNewChangeLog, TableChangeLog};

#[test]
fn test_filter_epoch() {
let table_change_log = TableChangeLog(vec![
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
epochs: vec![2],
},
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
epochs: vec![3, 4],
},
EpochNewChangeLog {
new_value: vec![],
old_value: vec![],
epochs: vec![5],
},
]);

let epochs = [1, 2, 3, 4, 5, 6];
for i in 0..epochs.len() {
for j in i..epochs.len() {
let min_epoch = epochs[i];
let max_epoch = epochs[j];
let expected = table_change_log
.0
.iter()
.filter(|log| {
&min_epoch <= log.epochs.last().unwrap()
&& log.epochs.first().unwrap() <= &max_epoch
})
.cloned()
.collect_vec();
let actual = table_change_log.filter_epoch((min_epoch, max_epoch));
assert_eq!(&expected, actual, "{:?}", (min_epoch, max_epoch));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,7 @@ mod tests {
max_committed_epoch: 0,
safe_epoch: 0,
table_watermarks: HashMap::new(),
table_change_log: HashMap::new(),
};
assert_eq!(version.get_object_ids().len(), 0);

Expand Down Expand Up @@ -1378,6 +1379,7 @@ mod tests {
max_committed_epoch: 0,
safe_epoch: 0,
table_watermarks: HashMap::new(),
table_change_log: HashMap::new(),
};
let version_delta = HummockVersionDelta {
id: 1,
Expand Down Expand Up @@ -1461,6 +1463,7 @@ mod tests {
max_committed_epoch: 0,
safe_epoch: 0,
table_watermarks: HashMap::new(),
table_change_log: HashMap::new(),
}
);
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::compaction_group::StaticCompactionGroupId;
use crate::key_range::KeyRangeCommon;
use crate::table_stats::{to_prost_table_stats_map, PbTableStatsMap, TableStatsMap};

pub mod change_log;
pub mod compact;
pub mod compaction_group;
pub mod key;
Expand Down
17 changes: 17 additions & 0 deletions src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_pb::hummock::hummock_version::PbLevels;
use risingwave_pb::hummock::hummock_version_delta::GroupDeltas as PbGroupDeltas;
use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};

use crate::change_log::TableChangeLog;
use crate::table_watermark::TableWatermarks;
use crate::{CompactionGroupId, HummockSstableObjectId};

Expand All @@ -33,6 +34,7 @@ pub struct HummockVersion {
pub max_committed_epoch: u64,
pub safe_epoch: u64,
pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
pub table_change_log: HashMap<TableId, TableChangeLog>,
}

impl Default for HummockVersion {
Expand Down Expand Up @@ -74,6 +76,16 @@ impl HummockVersion {
)
})
.collect(),
table_change_log: pb_version
.table_change_logs
.iter()
.map(|(table_id, change_log)| {
(
TableId::new(*table_id),
TableChangeLog::from_protobuf(change_log),
)
})
.collect(),
}
}

Expand All @@ -92,6 +104,11 @@ impl HummockVersion {
.iter()
.map(|(table_id, watermark)| (table_id.table_id, watermark.to_protobuf()))
.collect(),
table_change_logs: self
.table_change_log
.iter()
.map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
.collect(),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,7 @@ mod tests {
max_committed_epoch: epoch,
safe_epoch: 0,
table_watermarks: HashMap::new(),
table_change_log: HashMap::new(),
}
}

Expand Down
Loading

0 comments on commit 27e1b0e

Please sign in to comment.