Skip to content

Commit

Permalink
Merge branch 'master' into async-io
Browse files Browse the repository at this point in the history
  • Loading branch information
Little-Wallace authored Sep 30, 2020
2 parents ab2b110 + db60d28 commit 78337dd
Show file tree
Hide file tree
Showing 5 changed files with 498 additions and 70 deletions.
109 changes: 103 additions & 6 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ fn apply_item<E, W>(
}
}
LogItemContent::Command(Command::Clean) => {
memtables.remove(item.raft_group_id);
if self.memtables.remove(item.raft_group_id).is_some() {
self.purge_manager
.remove_memtable(file_num, item.raft_group_id);
}
}
LogItemContent::Command(Command::Compact { index }) => {
memtable.wl().compact_to(index);
Expand Down Expand Up @@ -621,6 +624,17 @@ mod tests {
use crate::util::ReadableSize;
use raft::eraftpb::Entry;

impl<E, W, P> Engine<E, W, P>
where
E: Message + Clone,
W: EntryExt<E> + 'static,
P: GenericPipeLog,
{
pub fn purge_manager(&self) -> &PurgeManager<E, W, P> {
&self.purge_manager
}
}

type RaftLogEngine = Engine<Entry, Entry, PipeLog>;
impl RaftLogEngine {
fn append(&self, raft_group_id: u64, entries: Vec<Entry>) -> Result<usize> {
Expand Down Expand Up @@ -734,7 +748,7 @@ mod tests {
// GC all log entries. Won't trigger purge because total size is not enough.
let count = engine.compact_to(1, 100);
assert_eq!(count, 100);
assert!(!engine.purge_manager.needs_purge_log_files());
assert!(!engine.purge_manager.needs_purge_log_files(LogQueue::Append));

// Append more logs to make total size greater than `purge_threshold`.
for i in 100..250 {
Expand All @@ -746,7 +760,7 @@ mod tests {
let count = engine.compact_to(1, 101);
assert_eq!(count, 1);
// Needs to purge because the total size is greater than `purge_threshold`.
assert!(engine.purge_manager.needs_purge_log_files());
assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append));

let old_min_file_num = engine.pipe_log.first_file_num(LogQueue::Append);
let will_force_compact = engine.purge_expired_files().unwrap();
Expand All @@ -761,7 +775,7 @@ mod tests {
let count = engine.compact_to(1, 102);
assert_eq!(count, 1);
// Needs to purge because the total size is greater than `purge_threshold`.
assert!(engine.purge_manager.needs_purge_log_files());
assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append));
let old_min_file_num = engine.pipe_log.first_file_num(LogQueue::Append);
let will_force_compact = engine.purge_expired_files().unwrap();
let new_min_file_num = engine.pipe_log.first_file_num(LogQueue::Append);
Expand Down Expand Up @@ -841,7 +855,7 @@ mod tests {
}

// The engine needs purge, and all old entries should be rewritten.
assert!(engine.purge_manager.needs_purge_log_files());
assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append));
assert!(engine.purge_expired_files().unwrap().is_empty());
assert!(engine.pipe_log.first_file_num(LogQueue::Append) > 1);

Expand Down Expand Up @@ -877,7 +891,7 @@ mod tests {
}
}

assert!(engine.purge_manager.needs_purge_log_files());
assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append));
assert!(engine.purge_expired_files().unwrap().is_empty());

let new_active_num = engine.pipe_log.active_file_num(LogQueue::Rewrite);
Expand All @@ -887,4 +901,87 @@ mod tests {
|| (new_active_num == active_num && new_active_len > active_len)
);
}

// Raft groups can be removed when they only have entries in the rewrite queue.
// We need to ensure that these raft groups won't appear again after recover.
#[test]
fn test_clean_raft_with_rewrite() {
let dir = tempfile::Builder::new()
.prefix("test_clean_raft_with_rewrite")
.tempdir()
.unwrap();

let mut cfg = Config::default();
cfg.dir = dir.path().to_str().unwrap().to_owned();
cfg.target_file_size = ReadableSize::kb(128);
cfg.purge_threshold = ReadableSize::kb(512);
let engine = RaftLogEngine::new(cfg.clone());

let mut entry = Entry::new();
entry.set_data(vec![b'x'; 1024]);

// Layout of region 1 in file 1:
// entries[1..10], Clean, entries[2..11]
for j in 1..=10 {
entry.set_index(j);
append_log(&engine, 1, &entry);
}
let mut log_batch = LogBatch::with_capacity(1);
log_batch.clean_region(1);
engine.write(&mut log_batch, false).unwrap();
assert!(engine.memtables.get(1).is_none());

entry.set_data(vec![b'y'; 1024]);
for j in 2..=11 {
entry.set_index(j);
append_log(&engine, 1, &entry);
}

assert_eq!(engine.pipe_log.active_file_num(LogQueue::Append), 1);

// Put more raft logs to trigger purge.
for i in 2..64 {
for j in 1..=10 {
entry.set_index(j);
append_log(&engine, i, &entry);
}
}

// The engine needs purge, and all old entries should be rewritten.
assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append));
assert!(engine.purge_expired_files().unwrap().is_empty());
assert!(engine.pipe_log.first_file_num(LogQueue::Append) > 1);

// All entries of region 1 has been rewritten.
let memtable_1 = engine.memtables.get(1).unwrap();
assert!(memtable_1.rl().max_file_num(LogQueue::Append).is_none());
assert!(memtable_1.rl().kvs_max_file_num(LogQueue::Append).is_none());
// Entries of region 1 after the clean command should be still valid.
for j in 2..=11 {
let entry_j = engine.get_entry(1, j).unwrap().unwrap();
assert_eq!(entry_j.get_data(), entry.get_data());
}

// Clean the raft group again.
let mut log_batch = LogBatch::with_capacity(1);
log_batch.clean_region(1);
engine.write(&mut log_batch, false).unwrap();
assert!(engine.memtables.get(1).is_none());

// Put more raft logs and then recover.
let active_file_num = engine.pipe_log.active_file_num(LogQueue::Append);
for i in 64..=128 {
for j in 1..=10 {
entry.set_index(j);
append_log(&engine, i, &entry);
}
}
assert!(engine.purge_expired_files().unwrap().is_empty());
assert!(engine.pipe_log.first_file_num(LogQueue::Append) > active_file_num);

// After the engine recovers, the removed raft group shouldn't appear again.
drop(engine);
let engine = RaftLogEngine::new(cfg.clone());
assert!(engine.memtables.get(1).is_none());
}
}
21 changes: 20 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod wal;

use crate::pipe_log::PipeLog;

pub use self::config::Config;
pub use self::config::{Config, RecoveryMode};
pub use self::errors::{Error, Result};
pub use self::log_batch::{EntryExt, LogBatch};
pub type RaftLogEngine<X, Y> = self::engine::Engine<X, Y, PipeLog>;
Expand All @@ -46,6 +46,11 @@ pub struct GlobalStats {
cache_hit: AtomicUsize,
cache_miss: AtomicUsize,
cache_size: AtomicUsize,

// How many operations in the rewrite queue.
rewrite_operations: AtomicUsize,
// How many compacted operations in the rewrite queue.
compacted_rewrite_operations: AtomicUsize,
}

impl GlobalStats {
Expand Down Expand Up @@ -80,6 +85,20 @@ impl GlobalStats {
}
}

pub fn add_rewrite(&self, count: usize) {
self.rewrite_operations.fetch_add(count, Ordering::Release);
}
pub fn add_compacted_rewrite(&self, count: usize) {
self.compacted_rewrite_operations
.fetch_add(count, Ordering::Release);
}
pub fn rewrite_operations(&self) -> usize {
self.rewrite_operations.load(Ordering::Acquire)
}
pub fn compacted_rewrite_operations(&self) -> usize {
self.compacted_rewrite_operations.load(Ordering::Acquire)
}

#[cfg(test)]
pub fn reset_cache(&self) {
self.cache_hit.store(0, Ordering::Relaxed);
Expand Down
Loading

0 comments on commit 78337dd

Please sign in to comment.