From a32665f3b691e7c76ea9673144ab08fc58624dfc Mon Sep 17 00:00:00 2001 From: tabokie Date: Fri, 2 Dec 2022 15:14:30 +0800 Subject: [PATCH] add a test Signed-off-by: tabokie --- src/engine.rs | 23 +++++++++++++- src/log_batch.rs | 78 ++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 88 insertions(+), 13 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index 2968823e..20e1f317 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -592,7 +592,7 @@ mod tests { use crate::env::ObfuscatedFileSystem; use crate::file_pipe_log::FileNameExt; use crate::pipe_log::Version; - use crate::test_util::{generate_entries, PanicGuard}; + use crate::test_util::{catch_unwind_silent, generate_entries, PanicGuard}; use crate::util::ReadableSize; use kvproto::raft_serverpb::RaftLocalState; use raft::eraftpb::Entry; @@ -704,6 +704,27 @@ mod tests { .unwrap(); } + #[test] + fn test_batch_with_save_point() { + let dir = tempfile::Builder::new() + .prefix("test_batch_with_save_point") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + ..Default::default() + }; + let engine = + RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); + let mut batch = LogBatch::default(); + batch.add_command(1, Command::Clean); + batch.set_save_point(); + engine.write(&mut batch, false).unwrap(); + assert!(catch_unwind_silent(|| batch.rollback_to_save_point()).is_err()); + assert!(catch_unwind_silent(|| batch.pop_save_point()).is_err()); + } + #[test] fn test_get_entry() { let normal_batch_size = 10; diff --git a/src/log_batch.rs b/src/log_batch.rs index 990b4198..ed435749 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -27,8 +27,8 @@ const CMD_COMPACT: u8 = 0x02; const DEFAULT_LOG_ITEM_BATCH_CAP: usize = 64; const MAX_LOG_BATCH_BUFFER_CAP: usize = 8 * 1024 * 1024; -// 2GiB, The maximum content length accepted by lz4 compression. -const MAX_LOG_ENTRIES_SIZE_PER_BATCH: usize = i32::MAX as usize; +// LZ4_MAX_INPUT_SIZE = 0x7E000000 +const MAX_LOG_ENTRIES_SIZE_PER_BATCH: usize = 0x7E000000; /// `MessageExt` trait allows for probing log index from a specific type of /// protobuf messages. @@ -421,15 +421,15 @@ impl LogItemBatch { #[inline] pub fn pop_save_point(&mut self) { - self.save_points.pop(); + self.save_points.pop().unwrap(); } #[inline] - pub fn rollback(&mut self) { - let (a, b, c) = self.save_points.last().unwrap(); - self.items.truncate(*a); - self.item_size = *b; - self.entries_size = *c; + pub fn rollback_to_save_point(&mut self) { + let (a, b, c) = self.save_points.pop().unwrap(); + self.items.truncate(a); + self.item_size = b; + self.entries_size = c; } pub(crate) fn finish_populate(&mut self, compression_type: CompressionType) { @@ -675,15 +675,16 @@ impl LogBatch { #[inline] pub fn pop_save_point(&mut self) { assert!(self.buf_state == BufState::Open); + self.item_batch.pop_save_point(); self.save_points.pop(); } /// No-op if there's no save point to rollback to. #[inline] - pub fn rollback(&mut self) { + pub fn rollback_to_save_point(&mut self) { assert!(self.buf_state == BufState::Open); - self.item_batch.rollback(); - self.buf.truncate(*self.save_points.last().unwrap()); + self.item_batch.rollback_to_save_point(); + self.buf.truncate(self.save_points.pop().unwrap()); } /// Adds some protobuf log entries into the log batch. @@ -1001,7 +1002,9 @@ fn verify_checksum_with_signature(buf: &[u8], signature: Option) -> Result< mod tests { use super::*; use crate::pipe_log::{LogQueue, Version}; - use crate::test_util::{catch_unwind_silent, generate_entries, generate_entry_indexes_opt}; + use crate::test_util::{ + catch_unwind_silent, generate_entries, generate_entry_indexes_opt, PanicGuard, + }; use protobuf::parse_from_bytes; use raft::eraftpb::Entry; use strum::IntoEnumIterator; @@ -1438,6 +1441,57 @@ mod tests { #[test] fn test_save_point() { + let ops = [ + |b: &mut LogBatch| { + b.add_entries::(1, &generate_entries(1, 11, None)) + .unwrap() + }, + |b: &mut LogBatch| { + b.add_entries::(7, &generate_entries(1, 11, Some(&vec![b'x'; 1024]))) + .unwrap() + }, + |b: &mut LogBatch| b.add_command(17, Command::Clean), + |b: &mut LogBatch| b.put(27, b"key27".to_vec(), b"value27".to_vec()), + |b: &mut LogBatch| b.delete(37, b"key37".to_vec()), + |b: &mut LogBatch| b.add_command(47, Command::Compact { index: 777 }), + |b: &mut LogBatch| { + b.add_entries::(57, &generate_entries(1, 51, None)) + .unwrap() + }, + ]; + for start in 0..ops.len() { + for stripe in 1..=5 { + for num in ops.len()..ops.len() * 5 { + for repeat in 1..=5 { + let _guard = PanicGuard::with_prompt(format!( + "case: [{}, {}, {}, {}]", + start, stripe, num, repeat + )); + let mut to_verify = Vec::new(); + let mut batch = LogBatch::default(); + let mut op_idx = start; + let mut total_op = 0; + for _ in 0..num { + for _ in 0..repeat { + ops[op_idx % ops.len()](&mut batch); + to_verify.push(batch.clone()); + batch.set_save_point(); + total_op += 1; + if total_op % 5 == 0 { + to_verify.pop().unwrap(); + batch.pop_save_point(); + } + } + op_idx += stripe; + } + while let Some(b) = to_verify.pop() { + batch.rollback_to_save_point(); + assert_eq!(batch, b); + } + } + } + } + } } #[cfg(feature = "nightly")]