Skip to content

Commit

Permalink
squeeze the rewrite queue if necessary (#55)
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu authored Sep 29, 2020
1 parent 4cf9d5c commit db60d28
Show file tree
Hide file tree
Showing 5 changed files with 340 additions and 64 deletions.
19 changes: 12 additions & 7 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,12 @@ where
cache_evict_worker.start(cache_evict_runner, Some(Duration::from_secs(1)));

let cfg = Arc::new(cfg);
let purge_manager = PurgeManager::new(cfg.clone(), memtables.clone(), pipe_log.clone());
let purge_manager = PurgeManager::new(
cfg.clone(),
memtables.clone(),
pipe_log.clone(),
global_stats.clone(),
);

let engine = Engine {
cfg,
Expand Down Expand Up @@ -672,7 +677,7 @@ mod tests {
// GC all log entries. Won't trigger purge because total size is not enough.
let count = engine.compact_to(1, 100);
assert_eq!(count, 100);
assert!(!engine.purge_manager.needs_purge_log_files());
assert!(!engine.purge_manager.needs_purge_log_files(LogQueue::Append));

// Append more logs to make total size greater than `purge_threshold`.
for i in 100..250 {
Expand All @@ -684,7 +689,7 @@ mod tests {
let count = engine.compact_to(1, 101);
assert_eq!(count, 1);
// Needs to purge because the total size is greater than `purge_threshold`.
assert!(engine.purge_manager.needs_purge_log_files());
assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append));

let old_min_file_num = engine.pipe_log.first_file_num(LogQueue::Append);
let will_force_compact = engine.purge_expired_files().unwrap();
Expand All @@ -699,7 +704,7 @@ mod tests {
let count = engine.compact_to(1, 102);
assert_eq!(count, 1);
// Needs to purge because the total size is greater than `purge_threshold`.
assert!(engine.purge_manager.needs_purge_log_files());
assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append));
let old_min_file_num = engine.pipe_log.first_file_num(LogQueue::Append);
let will_force_compact = engine.purge_expired_files().unwrap();
let new_min_file_num = engine.pipe_log.first_file_num(LogQueue::Append);
Expand Down Expand Up @@ -779,7 +784,7 @@ mod tests {
}

// The engine needs purge, and all old entries should be rewritten.
assert!(engine.purge_manager.needs_purge_log_files());
assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append));
assert!(engine.purge_expired_files().unwrap().is_empty());
assert!(engine.pipe_log.first_file_num(LogQueue::Append) > 1);

Expand Down Expand Up @@ -815,7 +820,7 @@ mod tests {
}
}

assert!(engine.purge_manager.needs_purge_log_files());
assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append));
assert!(engine.purge_expired_files().unwrap().is_empty());

let new_active_num = engine.pipe_log.active_file_num(LogQueue::Rewrite);
Expand Down Expand Up @@ -872,7 +877,7 @@ mod tests {
}

// The engine needs purge, and all old entries should be rewritten.
assert!(engine.purge_manager.needs_purge_log_files());
assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append));
assert!(engine.purge_expired_files().unwrap().is_empty());
assert!(engine.pipe_log.first_file_num(LogQueue::Append) > 1);

Expand Down
21 changes: 20 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mod util;

use crate::pipe_log::PipeLog;

pub use self::config::Config;
pub use self::config::{Config, RecoveryMode};
pub use self::errors::{Error, Result};
pub use self::log_batch::{EntryExt, LogBatch};
pub type RaftLogEngine<X, Y> = self::engine::Engine<X, Y, PipeLog>;
Expand All @@ -45,6 +45,11 @@ pub struct GlobalStats {
cache_hit: AtomicUsize,
cache_miss: AtomicUsize,
cache_size: AtomicUsize,

// How many operations in the rewrite queue.
rewrite_operations: AtomicUsize,
// How many compacted operations in the rewrite queue.
compacted_rewrite_operations: AtomicUsize,
}

impl GlobalStats {
Expand Down Expand Up @@ -79,6 +84,20 @@ impl GlobalStats {
}
}

pub fn add_rewrite(&self, count: usize) {
self.rewrite_operations.fetch_add(count, Ordering::Release);
}
pub fn add_compacted_rewrite(&self, count: usize) {
self.compacted_rewrite_operations
.fetch_add(count, Ordering::Release);
}
pub fn rewrite_operations(&self) -> usize {
self.rewrite_operations.load(Ordering::Acquire)
}
pub fn compacted_rewrite_operations(&self) -> usize {
self.compacted_rewrite_operations.load(Ordering::Acquire)
}

#[cfg(test)]
pub fn reset_cache(&self) {
self.cache_hit.store(0, Ordering::Relaxed);
Expand Down
Loading

0 comments on commit db60d28

Please sign in to comment.