Skip to content

Commit

Permalink
add a test
Browse files Browse the repository at this point in the history
Signed-off-by: tabokie <[email protected]>
  • Loading branch information
tabokie committed Dec 2, 2022
1 parent a27c895 commit a32665f
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 13 deletions.
23 changes: 22 additions & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ mod tests {
use crate::env::ObfuscatedFileSystem;
use crate::file_pipe_log::FileNameExt;
use crate::pipe_log::Version;
use crate::test_util::{generate_entries, PanicGuard};
use crate::test_util::{catch_unwind_silent, generate_entries, PanicGuard};
use crate::util::ReadableSize;
use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
Expand Down Expand Up @@ -704,6 +704,27 @@ mod tests {
.unwrap();
}

#[test]
fn test_batch_with_save_point() {
let dir = tempfile::Builder::new()
.prefix("test_batch_with_save_point")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
let mut batch = LogBatch::default();
batch.add_command(1, Command::Clean);
batch.set_save_point();
engine.write(&mut batch, false).unwrap();
assert!(catch_unwind_silent(|| batch.rollback_to_save_point()).is_err());
assert!(catch_unwind_silent(|| batch.pop_save_point()).is_err());
}

#[test]
fn test_get_entry() {
let normal_batch_size = 10;
Expand Down
78 changes: 66 additions & 12 deletions src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const CMD_COMPACT: u8 = 0x02;

const DEFAULT_LOG_ITEM_BATCH_CAP: usize = 64;
const MAX_LOG_BATCH_BUFFER_CAP: usize = 8 * 1024 * 1024;
// 2GiB, The maximum content length accepted by lz4 compression.
const MAX_LOG_ENTRIES_SIZE_PER_BATCH: usize = i32::MAX as usize;
// LZ4_MAX_INPUT_SIZE = 0x7E000000
const MAX_LOG_ENTRIES_SIZE_PER_BATCH: usize = 0x7E000000;

/// `MessageExt` trait allows for probing log index from a specific type of
/// protobuf messages.
Expand Down Expand Up @@ -421,15 +421,15 @@ impl LogItemBatch {

#[inline]
pub fn pop_save_point(&mut self) {
self.save_points.pop();
self.save_points.pop().unwrap();
}

#[inline]
pub fn rollback(&mut self) {
let (a, b, c) = self.save_points.last().unwrap();
self.items.truncate(*a);
self.item_size = *b;
self.entries_size = *c;
pub fn rollback_to_save_point(&mut self) {
let (a, b, c) = self.save_points.pop().unwrap();
self.items.truncate(a);
self.item_size = b;
self.entries_size = c;
}

pub(crate) fn finish_populate(&mut self, compression_type: CompressionType) {
Expand Down Expand Up @@ -675,15 +675,16 @@ impl LogBatch {
#[inline]
pub fn pop_save_point(&mut self) {
assert!(self.buf_state == BufState::Open);
self.item_batch.pop_save_point();
self.save_points.pop();
}

/// No-op if there's no save point to rollback to.
#[inline]
pub fn rollback(&mut self) {
pub fn rollback_to_save_point(&mut self) {
assert!(self.buf_state == BufState::Open);
self.item_batch.rollback();
self.buf.truncate(*self.save_points.last().unwrap());
self.item_batch.rollback_to_save_point();
self.buf.truncate(self.save_points.pop().unwrap());
}

/// Adds some protobuf log entries into the log batch.
Expand Down Expand Up @@ -1001,7 +1002,9 @@ fn verify_checksum_with_signature(buf: &[u8], signature: Option<u32>) -> Result<
mod tests {
use super::*;
use crate::pipe_log::{LogQueue, Version};
use crate::test_util::{catch_unwind_silent, generate_entries, generate_entry_indexes_opt};
use crate::test_util::{
catch_unwind_silent, generate_entries, generate_entry_indexes_opt, PanicGuard,
};
use protobuf::parse_from_bytes;
use raft::eraftpb::Entry;
use strum::IntoEnumIterator;
Expand Down Expand Up @@ -1438,6 +1441,57 @@ mod tests {

#[test]
fn test_save_point() {
let ops = [
|b: &mut LogBatch| {
b.add_entries::<Entry>(1, &generate_entries(1, 11, None))
.unwrap()
},
|b: &mut LogBatch| {
b.add_entries::<Entry>(7, &generate_entries(1, 11, Some(&vec![b'x'; 1024])))
.unwrap()
},
|b: &mut LogBatch| b.add_command(17, Command::Clean),
|b: &mut LogBatch| b.put(27, b"key27".to_vec(), b"value27".to_vec()),
|b: &mut LogBatch| b.delete(37, b"key37".to_vec()),
|b: &mut LogBatch| b.add_command(47, Command::Compact { index: 777 }),
|b: &mut LogBatch| {
b.add_entries::<Entry>(57, &generate_entries(1, 51, None))
.unwrap()
},
];
for start in 0..ops.len() {
for stripe in 1..=5 {
for num in ops.len()..ops.len() * 5 {
for repeat in 1..=5 {
let _guard = PanicGuard::with_prompt(format!(
"case: [{}, {}, {}, {}]",
start, stripe, num, repeat
));
let mut to_verify = Vec::new();
let mut batch = LogBatch::default();
let mut op_idx = start;
let mut total_op = 0;
for _ in 0..num {
for _ in 0..repeat {
ops[op_idx % ops.len()](&mut batch);
to_verify.push(batch.clone());
batch.set_save_point();
total_op += 1;
if total_op % 5 == 0 {
to_verify.pop().unwrap();
batch.pop_save_point();
}
}
op_idx += stripe;
}
while let Some(b) = to_verify.pop() {
batch.rollback_to_save_point();
assert_eq!(batch, b);
}
}
}
}
}
}

#[cfg(feature = "nightly")]
Expand Down

0 comments on commit a32665f

Please sign in to comment.