diff --git a/src/engine.rs b/src/engine.rs index 16d18f8f..4ef65e69 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -236,7 +236,12 @@ where cache_evict_worker.start(cache_evict_runner, Some(Duration::from_secs(1))); let cfg = Arc::new(cfg); - let purge_manager = PurgeManager::new(cfg.clone(), memtables.clone(), pipe_log.clone()); + let purge_manager = PurgeManager::new( + cfg.clone(), + memtables.clone(), + pipe_log.clone(), + global_stats.clone(), + ); let engine = Engine { cfg, @@ -672,7 +677,7 @@ mod tests { // GC all log entries. Won't trigger purge because total size is not enough. let count = engine.compact_to(1, 100); assert_eq!(count, 100); - assert!(!engine.purge_manager.needs_purge_log_files()); + assert!(!engine.purge_manager.needs_purge_log_files(LogQueue::Append)); // Append more logs to make total size greater than `purge_threshold`. for i in 100..250 { @@ -684,7 +689,7 @@ mod tests { let count = engine.compact_to(1, 101); assert_eq!(count, 1); // Needs to purge because the total size is greater than `purge_threshold`. - assert!(engine.purge_manager.needs_purge_log_files()); + assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append)); let old_min_file_num = engine.pipe_log.first_file_num(LogQueue::Append); let will_force_compact = engine.purge_expired_files().unwrap(); @@ -699,7 +704,7 @@ mod tests { let count = engine.compact_to(1, 102); assert_eq!(count, 1); // Needs to purge because the total size is greater than `purge_threshold`. - assert!(engine.purge_manager.needs_purge_log_files()); + assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append)); let old_min_file_num = engine.pipe_log.first_file_num(LogQueue::Append); let will_force_compact = engine.purge_expired_files().unwrap(); let new_min_file_num = engine.pipe_log.first_file_num(LogQueue::Append); @@ -779,7 +784,7 @@ mod tests { } // The engine needs purge, and all old entries should be rewritten. - assert!(engine.purge_manager.needs_purge_log_files()); + assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append)); assert!(engine.purge_expired_files().unwrap().is_empty()); assert!(engine.pipe_log.first_file_num(LogQueue::Append) > 1); @@ -815,7 +820,7 @@ mod tests { } } - assert!(engine.purge_manager.needs_purge_log_files()); + assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append)); assert!(engine.purge_expired_files().unwrap().is_empty()); let new_active_num = engine.pipe_log.active_file_num(LogQueue::Rewrite); @@ -872,7 +877,7 @@ mod tests { } // The engine needs purge, and all old entries should be rewritten. - assert!(engine.purge_manager.needs_purge_log_files()); + assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append)); assert!(engine.purge_expired_files().unwrap().is_empty()); assert!(engine.pipe_log.first_file_num(LogQueue::Append) > 1); diff --git a/src/lib.rs b/src/lib.rs index 9f4f0011..8ca5004c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,7 @@ mod util; use crate::pipe_log::PipeLog; -pub use self::config::Config; +pub use self::config::{Config, RecoveryMode}; pub use self::errors::{Error, Result}; pub use self::log_batch::{EntryExt, LogBatch}; pub type RaftLogEngine = self::engine::Engine; @@ -45,6 +45,11 @@ pub struct GlobalStats { cache_hit: AtomicUsize, cache_miss: AtomicUsize, cache_size: AtomicUsize, + + // How many operations in the rewrite queue. + rewrite_operations: AtomicUsize, + // How many compacted operations in the rewrite queue. + compacted_rewrite_operations: AtomicUsize, } impl GlobalStats { @@ -79,6 +84,20 @@ impl GlobalStats { } } + pub fn add_rewrite(&self, count: usize) { + self.rewrite_operations.fetch_add(count, Ordering::Release); + } + pub fn add_compacted_rewrite(&self, count: usize) { + self.compacted_rewrite_operations + .fetch_add(count, Ordering::Release); + } + pub fn rewrite_operations(&self) -> usize { + self.rewrite_operations.load(Ordering::Acquire) + } + pub fn compacted_rewrite_operations(&self) -> usize { + self.compacted_rewrite_operations.load(Ordering::Acquire) + } + #[cfg(test)] pub fn reset_cache(&self) { self.cache_hit.store(0, Ordering::Relaxed); diff --git a/src/memtable.rs b/src/memtable.rs index 6026000c..220552e1 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -94,6 +94,17 @@ impl> MemTable { distance } + fn adjust_rewrite_count(&mut self, new_rewrite_count: usize) { + let rewrite_delta = new_rewrite_count as i64 - self.rewrite_count as i64; + self.rewrite_count = new_rewrite_count; + if rewrite_delta >= 0 { + self.global_stats.add_rewrite(rewrite_delta as usize); + } else { + let rewrite_delta = -rewrite_delta as usize; + self.global_stats.add_compacted_rewrite(rewrite_delta); + } + } + // Remove all cached entries with index greater than or equal to the given. fn cut_entries_cache(&mut self, index: u64) { if self.entries_cache.is_empty() { @@ -124,9 +135,9 @@ impl> MemTable { } // Remove all entry indexes with index greater than or equal to the given. - fn cut_entries_index(&mut self, index: u64) { + fn cut_entries_index(&mut self, index: u64) -> usize { if self.entries_index.is_empty() { - return; + return 0; } let last_index = self.entries_index.back().unwrap().index; let first_index = self.entries_index.front().unwrap().index; @@ -134,10 +145,13 @@ impl> MemTable { let conflict = if index <= last_index { (index - first_index) as usize } else { - return; + return 0; }; self.entries_index.truncate(conflict); - self.rewrite_count = cmp::min(self.rewrite_count, self.entries_index.len()); + + let new_rewrite_count = cmp::min(self.rewrite_count, self.entries_index.len()); + self.adjust_rewrite_count(new_rewrite_count); + (last_index - index + 1) as usize } fn shrink_entries_cache(&mut self) { @@ -218,57 +232,123 @@ impl> MemTable { ei.queue = LogQueue::Rewrite; } self.append(entries, entries_index); - self.rewrite_count = self.entries_index.len(); + + let new_rewrite_count = self.entries_index.len(); + self.adjust_rewrite_count(new_rewrite_count); } - pub fn rewrite(&mut self, entries_index: Vec, latest_rewrite: u64) { - if entries_index.is_empty() || self.entries_index.is_empty() { - return; + pub fn rewrite(&mut self, entries_index: Vec, latest_rewrite: Option) { + if !entries_index.is_empty() { + self.global_stats.add_rewrite(entries_index.len()); + let compacted_rewrite = self.rewrite_impl(entries_index, latest_rewrite); + self.global_stats.add_compacted_rewrite(compacted_rewrite); } + } - let first = entries_index.first().unwrap().index; - let front = self.entries_index[self.rewrite_count].index; - assert!(front >= first); - let distance = (front - first) as usize; - if distance >= entries_index.len() { - return; + // Try to rewrite some entries which have already been written into the rewrite queue. + // However some are not necessary any more, so that return `compacted_rewrite_operations`. + fn rewrite_impl( + &mut self, + mut entries_index: Vec, + latest_rewrite: Option, + ) -> usize { + if self.entries_index.is_empty() { + // All entries are compacted. + return entries_index.len(); } - let last = entries_index.last().unwrap().index; + let mut compacted_rewrite_operations = 0; + + let front = self.entries_index.front().unwrap().index; let back = self.entries_index.back().unwrap().index; - let len = (cmp::min(last, back) - entries_index[distance].index + 1) as usize; + let mut first = entries_index.first().unwrap().index; + debug_assert!(first <= self.entries_index[self.rewrite_count].index); + let last = entries_index.last().unwrap().index; + + if last < front { + // All rewritten entries are compacted. + return entries_index.len(); + } + + if first < front { + // Some of head entries in `entries_index` are compacted. + let offset = (front - first) as usize; + entries_index.drain(..offset); + first = entries_index.first().unwrap().index; + compacted_rewrite_operations += offset; + } + + let distance = (first - front) as usize; + let len = (cmp::min(last, back) - first + 1) as usize; + for (i, ei) in entries_index.iter().take(len).enumerate() { + if let Some(latest_rewrite) = latest_rewrite { + debug_assert_eq!(self.entries_index[i + distance].queue, LogQueue::Append); + if self.entries_index[i + distance].file_num > latest_rewrite { + // Some entries are overwritten by new appends. + compacted_rewrite_operations += entries_index.len() - i; + break; + } + } else { + // It's a squeeze operation. + debug_assert_eq!(self.entries_index[i + distance].queue, LogQueue::Rewrite); + } - for ei in entries_index.iter().skip(distance).take(len) { - if self.entries_index[self.rewrite_count].file_num > latest_rewrite { - // Some entries are overwritten by new appends. - break; + if self.entries_index[i + distance].queue != LogQueue::Rewrite { + debug_assert_eq!(ei.queue, LogQueue::Rewrite); + self.entries_index[i + distance].queue = LogQueue::Rewrite; + self.rewrite_count += 1; } - self.entries_index[self.rewrite_count].queue = ei.queue; - self.entries_index[self.rewrite_count].file_num = ei.file_num; - self.entries_index[self.rewrite_count].base_offset = ei.base_offset; - self.rewrite_count += 1; + + self.entries_index[i + distance].file_num = ei.file_num; + self.entries_index[i + distance].base_offset = ei.base_offset; + self.entries_index[i + distance].compression_type = ei.compression_type; + self.entries_index[i + distance].batch_len = ei.batch_len; } + compacted_rewrite_operations } pub fn put(&mut self, key: Vec, value: Vec, file_num: u64) { - self.kvs.insert(key, (value, LogQueue::Append, file_num)); + if let Some(origin) = self.kvs.insert(key, (value, LogQueue::Append, file_num)) { + if origin.1 == LogQueue::Rewrite { + self.global_stats.add_compacted_rewrite(1); + } + } } - pub fn rewrite_key(&mut self, key: Vec, latest_rewrite: u64, file_num: u64) { + pub fn rewrite_key(&mut self, key: Vec, latest_rewrite: Option, file_num: u64) { + self.global_stats.add_rewrite(1); if let Some(value) = self.kvs.get_mut(&key) { - if value.2 <= latest_rewrite { - value.1 = LogQueue::Rewrite; + if value.1 == LogQueue::Append { + if let Some(latest_rewrite) = latest_rewrite { + if value.2 <= latest_rewrite { + value.1 = LogQueue::Rewrite; + value.2 = file_num; + } + } else { + // The rewritten key/value pair has been overwritten. + self.global_stats.add_compacted_rewrite(1); + } + } else { + assert!(value.2 <= file_num); value.2 = file_num; } + } else { + // The rewritten key/value pair has been compacted. + self.global_stats.add_compacted_rewrite(1); } } pub fn put_rewrite(&mut self, key: Vec, value: Vec, file_num: u64) { self.kvs.insert(key, (value, LogQueue::Rewrite, file_num)); + self.global_stats.add_rewrite(1); } pub fn delete(&mut self, key: &[u8]) { - self.kvs.remove(key); + if let Some(value) = self.kvs.remove(key) { + if value.1 == LogQueue::Rewrite { + self.global_stats.add_compacted_rewrite(1); + } + } } pub fn get(&self, key: &[u8]) -> Option> { @@ -288,7 +368,9 @@ impl> MemTable { let drain_end = (idx - first_idx) as usize; self.entries_index.drain(..drain_end); self.shrink_entries_index(); - self.rewrite_count -= cmp::min(drain_end, self.rewrite_count); + + let rewrite_sub = cmp::min(drain_end, self.rewrite_count); + self.adjust_rewrite_count(self.rewrite_count - rewrite_sub); drain_end as u64 } @@ -430,6 +512,19 @@ impl> MemTable { Ok(()) } + pub fn fetch_entries_from_rewrite( + &self, + vec: &mut Vec, + vec_idx: &mut Vec, + ) -> Result<()> { + if self.rewrite_count > 0 { + let end = self.entries_index[self.rewrite_count].index; + let first = self.entries_index.front().unwrap().index; + return self.fetch_entries_to(first, end, None, vec, vec_idx); + } + Ok(()) + } + pub fn fetch_rewrite_kvs(&self, latest_rewrite: u64, vec: &mut Vec<(Vec, Vec)>) { for (key, (value, queue, file_num)) in &self.kvs { if *queue == LogQueue::Append && *file_num <= latest_rewrite { @@ -438,6 +533,14 @@ impl> MemTable { } } + pub fn fetch_kvs_from_rewrite(&self, vec: &mut Vec<(Vec, Vec)>) { + for (key, (value, queue, _)) in &self.kvs { + if *queue == LogQueue::Rewrite { + vec.push((key.clone(), value.clone())); + } + } + } + pub fn min_file_num(&self, queue: LogQueue) -> Option { let entry = match queue { LogQueue::Append => self.entries_index.get(self.rewrite_count), @@ -893,6 +996,50 @@ mod tests { assert_eq!(stats.cache_hit(), 1); } + #[test] + fn test_memtable_fetch_rewrite() { + let region_id = 8; + let cache_limit = 0; + let stats = Arc::new(GlobalStats::default()); + let mut memtable = MemTable::::new(region_id, cache_limit, stats.clone()); + + // After appending: + // [0, 10) file_num = 1 + // [10, 20) file_num = 2 + // [20, 30) file_num = 3 + memtable.append(generate_ents(0, 10), generate_ents_index(0, 10, 1)); + memtable.put(b"k1".to_vec(), b"v1".to_vec(), 1); + memtable.append(generate_ents(10, 20), generate_ents_index(10, 20, 2)); + memtable.put(b"k2".to_vec(), b"v2".to_vec(), 2); + memtable.append(generate_ents(20, 25), generate_ents_index(20, 25, 3)); + memtable.put(b"k3".to_vec(), b"v3".to_vec(), 3); + + // After rewriting: + // [0, 10) queue = rewrite, file_num = 50, + // [10, 20) file_num = 2 + // [20, 30) file_num = 3 + memtable.rewrite(generate_rewrite_ents_index(0, 10, 50), Some(1)); + memtable.rewrite_key(b"k1".to_vec(), Some(1), 50); + + let (mut ents, mut ents_idx) = (vec![], vec![]); + + assert!(memtable + .fetch_rewrite_entries(2, &mut ents, &mut ents_idx) + .is_ok()); + assert_eq!(ents_idx.len(), 10); + assert_eq!(ents_idx.first().unwrap().index, 10); + assert_eq!(ents_idx.last().unwrap().index, 19); + + ents.clear(); + ents_idx.clear(); + assert!(memtable + .fetch_entries_from_rewrite(&mut ents, &mut ents_idx) + .is_ok()); + assert_eq!(ents_idx.len(), 10); + assert_eq!(ents_idx.first().unwrap().index, 0); + assert_eq!(ents_idx.last().unwrap().index, 9); + } + #[test] fn test_memtable_kv_operations() { let region_id = 8; @@ -954,7 +1101,7 @@ mod tests { memtable.append(generate_ents(20, 30), generate_ents_index(20, 30, 3)); memtable.put(b"kk2".to_vec(), b"vv2".to_vec(), 3); memtable.append(generate_ents(30, 40), generate_ents_index(30, 40, 4)); - memtable.put(b"kk2".to_vec(), b"vv3".to_vec(), 4); + memtable.put(b"kk3".to_vec(), b"vv3".to_vec(), 4); assert_eq!(memtable.cache_size, 15); assert_eq!(memtable.entries_size(), 30); @@ -963,75 +1110,97 @@ mod tests { memtable.check_entries_index_and_cache(); // Rewrite compacted entries. - memtable.rewrite(generate_rewrite_ents_index(1, 10, 50), 1); - memtable.rewrite_key(b"kk0".to_vec(), 1, 50); + memtable.rewrite(generate_rewrite_ents_index(0, 10, 50), Some(1)); + memtable.rewrite_key(b"kk0".to_vec(), Some(1), 50); assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 2); assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 4); assert!(memtable.min_file_num(LogQueue::Rewrite).is_none()); assert!(memtable.max_file_num(LogQueue::Rewrite).is_none()); assert_eq!(memtable.rewrite_count, 0); assert_eq!(memtable.get(b"kk0"), None); + assert_eq!(memtable.global_stats.rewrite_operations(), 11); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 11); // Rewrite compacted entries + valid entries. - memtable.rewrite(generate_rewrite_ents_index(1, 20, 100), 2); - memtable.rewrite_key(b"kk1".to_vec(), 2, 100); + memtable.rewrite(generate_rewrite_ents_index(0, 20, 100), Some(2)); + memtable.rewrite_key(b"kk1".to_vec(), Some(2), 100); assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 3); assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 4); assert_eq!(memtable.min_file_num(LogQueue::Rewrite).unwrap(), 100); assert_eq!(memtable.max_file_num(LogQueue::Rewrite).unwrap(), 100); assert_eq!(memtable.rewrite_count, 10); assert_eq!(memtable.get(b"kk1"), Some(b"vv1".to_vec())); + assert_eq!(memtable.global_stats.rewrite_operations(), 32); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 21); // Rewrite vaild entries, some of them are in cache. - memtable.rewrite(generate_rewrite_ents_index(20, 30, 101), 3); - memtable.rewrite_key(b"kk2".to_vec(), 3, 101); + memtable.rewrite(generate_rewrite_ents_index(20, 30, 101), Some(3)); + memtable.rewrite_key(b"kk2".to_vec(), Some(3), 101); assert_eq!(memtable.cache_size, 15); assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 4); assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 4); assert_eq!(memtable.min_file_num(LogQueue::Rewrite).unwrap(), 100); assert_eq!(memtable.max_file_num(LogQueue::Rewrite).unwrap(), 101); assert_eq!(memtable.rewrite_count, 20); - assert_eq!(memtable.get(b"kk2"), Some(b"vv3".to_vec())); + assert_eq!(memtable.get(b"kk2"), Some(b"vv2".to_vec())); + assert_eq!(memtable.global_stats.rewrite_operations(), 43); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 21); // Rewrite valid + overwritten entries. memtable.append(generate_ents(35, 36), generate_ents_index(35, 36, 5)); memtable.put(b"kk2".to_vec(), b"vv4".to_vec(), 5); assert_eq!(memtable.cache_size, 11); assert_eq!(memtable.entries_index.back().unwrap().index, 35); - memtable.rewrite(generate_rewrite_ents_index(30, 40, 102), 4); - memtable.rewrite_key(b"kk2".to_vec(), 4, 102); + assert_eq!(memtable.global_stats.rewrite_operations(), 43); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 22); + memtable.rewrite(generate_rewrite_ents_index(30, 40, 102), Some(4)); + memtable.rewrite_key(b"kk3".to_vec(), Some(4), 102); assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 5); assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 5); assert_eq!(memtable.min_file_num(LogQueue::Rewrite).unwrap(), 100); assert_eq!(memtable.max_file_num(LogQueue::Rewrite).unwrap(), 102); assert_eq!(memtable.rewrite_count, 25); assert_eq!(memtable.get(b"kk2"), Some(b"vv4".to_vec())); + assert_eq!(memtable.global_stats.rewrite_operations(), 54); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 27); // Rewrite after compact. memtable.append(generate_ents(36, 50), generate_ents_index(36, 50, 6)); memtable.compact_to(30); assert_eq!(memtable.rewrite_count, 5); + assert_eq!(memtable.global_stats.rewrite_operations(), 54); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 47); memtable.compact_to(40); assert_eq!(memtable.rewrite_count, 0); + assert_eq!(memtable.global_stats.rewrite_operations(), 54); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 52); assert_eq!(memtable.cache_size, 10); assert_eq!(memtable.entries_index.back().unwrap().index, 49); - memtable.rewrite(generate_rewrite_ents_index(30, 36, 103), 5); - memtable.rewrite_key(b"kk2".to_vec(), 5, 103); + memtable.rewrite(generate_rewrite_ents_index(30, 36, 103), Some(5)); + memtable.rewrite_key(b"kk2".to_vec(), Some(5), 103); assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 6); assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 6); assert_eq!(memtable.min_file_num(LogQueue::Rewrite).unwrap(), 100); assert_eq!(memtable.max_file_num(LogQueue::Rewrite).unwrap(), 103); assert_eq!(memtable.rewrite_count, 0); + assert_eq!(memtable.global_stats.rewrite_operations(), 61); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 58); // Rewrite after cut. memtable.append(generate_ents(50, 55), generate_ents_index(50, 55, 7)); - memtable.rewrite(generate_rewrite_ents_index(30, 50, 104), 6); + memtable.rewrite(generate_rewrite_ents_index(30, 50, 104), Some(6)); assert_eq!(memtable.rewrite_count, 10); + assert_eq!(memtable.global_stats.rewrite_operations(), 81); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 68); memtable.append(generate_ents(45, 50), generate_ents_index(45, 50, 7)); assert_eq!(memtable.rewrite_count, 5); + assert_eq!(memtable.global_stats.rewrite_operations(), 81); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 73); memtable.append(generate_ents(40, 50), generate_ents_index(40, 50, 7)); assert_eq!(memtable.rewrite_count, 0); + assert_eq!(memtable.global_stats.rewrite_operations(), 81); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 78); } fn generate_ents(begin_idx: u64, end_idx: u64) -> Vec { diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 311157c9..4666e6dc 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -73,6 +73,8 @@ pub trait GenericPipeLog: Sized + Clone + Send { /// Truncate the active log file of `queue`. fn truncate_active_log(&self, queue: LogQueue, offset: Option) -> Result<()>; + fn new_log_file(&self, queue: LogQueue) -> Result<()>; + /// Sync the given queue. fn sync(&self, queue: LogQueue) -> Result<()>; @@ -494,6 +496,10 @@ impl GenericPipeLog for PipeLog { self.mut_queue(queue).truncate_active_log(offset) } + fn new_log_file(&self, queue: LogQueue) -> Result<()> { + self.mut_queue(queue).new_log_file() + } + fn sync(&self, queue: LogQueue) -> Result<()> { if let Some(fd) = self.get_queue(queue).get_active_fd() { fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; diff --git a/src/purge.rs b/src/purge.rs index fe292f78..60bc5103 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -10,7 +10,7 @@ use crate::engine::{fetch_entries, MemTableAccessor}; use crate::log_batch::{Command, EntryExt, LogBatch, LogItemContent, OpType}; use crate::pipe_log::{GenericPipeLog, LogQueue}; use crate::util::HandyRwLock; -use crate::Result; +use crate::{GlobalStats, Result}; // If a region has some very old raft logs less than this threshold, // rewrite them to clean stale log files ASAP. @@ -28,10 +28,14 @@ where cfg: Arc, memtables: MemTableAccessor, pipe_log: P, + global_stats: Arc, // Vector of (file_num, raft_group_id). #[allow(clippy::type_complexity)] removed_memtables: Arc, u64)>>>, + + // Only one thread can run `purge_expired_files` at a time. + purge_mutex: Arc>, } impl PurgeManager @@ -44,17 +48,25 @@ where cfg: Arc, memtables: MemTableAccessor, pipe_log: P, + global_stats: Arc, ) -> PurgeManager { PurgeManager { cfg, memtables, pipe_log, + global_stats, removed_memtables: Default::default(), + purge_mutex: Arc::new(Mutex::new(())), } } pub fn purge_expired_files(&self) -> Result> { - if !self.needs_purge_log_files() { + let _purge_mutex = match self.purge_mutex.try_lock() { + Ok(context) => context, + _ => return Ok(vec![]), + }; + + if !self.needs_purge_log_files(LogQueue::Append) { return Ok(vec![]); } @@ -66,19 +78,41 @@ where &mut will_force_compact, ); - let min_file_num = self.memtables.fold(u64::MAX, |min, t| { - cmp::min(min, t.min_file_num(LogQueue::Append).unwrap_or(u64::MAX)) - }); - let purged = self.pipe_log.purge_to(LogQueue::Append, min_file_num)?; - info!("purged {} expired log files", purged); + if self.rewrite_queue_needs_squeeze() && self.needs_purge_log_files(LogQueue::Rewrite) { + self.squeeze_rewrite_queue(); + } + + let (min_file_1, min_file_2) = + self.memtables + .fold((u64::MAX, u64::MAX), |(mut min1, mut min2), t| { + min1 = cmp::min(min1, t.min_file_num(LogQueue::Append).unwrap_or(u64::MAX)); + min2 = cmp::min(min2, t.min_file_num(LogQueue::Rewrite).unwrap_or(u64::MAX)); + (min1, min2) + }); + let purged_1 = self.pipe_log.purge_to(LogQueue::Append, min_file_1)?; + info!("purged {} expired log files", purged_1); + + if min_file_2 < self.pipe_log.active_file_num(LogQueue::Rewrite) { + let purged_2 = self.pipe_log.purge_to(LogQueue::Rewrite, min_file_2)?; + info!("purged {} expired rewrite files", purged_2); + } Ok(will_force_compact) } - pub fn needs_purge_log_files(&self) -> bool { - let total_size = self.pipe_log.total_size(LogQueue::Append); + pub fn needs_purge_log_files(&self, queue: LogQueue) -> bool { + let active_file_num = self.pipe_log.active_file_num(queue); + let first_file_num = self.pipe_log.first_file_num(queue); + if active_file_num == first_file_num { + return false; + } + + let total_size = self.pipe_log.total_size(queue); let purge_threshold = self.cfg.purge_threshold.0 as usize; - total_size > purge_threshold + match queue { + LogQueue::Append => total_size > purge_threshold, + LogQueue::Rewrite => total_size * 10 > purge_threshold, + } } pub fn remove_memtable(&self, file_num: u64, raft_group_id: u64) { @@ -107,6 +141,13 @@ where (latest_needs_rewrite, latest_needs_compact) } + fn rewrite_queue_needs_squeeze(&self) -> bool { + // Squeeze the rewrite queue if its garbage ratio reaches 50%. + let rewrite_operations = self.global_stats.rewrite_operations(); + let compacted_rewrite_operations = self.global_stats.compacted_rewrite_operations(); + compacted_rewrite_operations as f64 / rewrite_operations as f64 > 0.5 + } + // FIXME: We need to ensure that all operations before `latest_rewrite` (included) are written // into memtables. fn regions_rewrite_or_force_compact( @@ -158,10 +199,15 @@ where log_batch.put(region_id, k, v); } } - self.rewrite_impl(&mut log_batch, latest_rewrite).unwrap(); + self.rewrite_impl(&mut log_batch, Some(latest_rewrite)) + .unwrap(); } - fn rewrite_impl(&self, log_batch: &mut LogBatch, latest_rewrite: u64) -> Result<()> { + fn rewrite_impl( + &self, + log_batch: &mut LogBatch, + latest_rewrite: Option, + ) -> Result<()> { let mut file_num = 0; self.pipe_log.rewrite(&log_batch, true, &mut file_num)?; if file_num > 0 { @@ -174,7 +220,7 @@ where &self, log_batch: &mut LogBatch, file_num: u64, - latest_rewrite: u64, + latest_rewrite: Option, ) { for item in log_batch.items.drain(..) { let memtable = self.memtables.get_or_insert(item.raft_group_id); @@ -194,6 +240,37 @@ where } } } + + fn squeeze_rewrite_queue(&self) { + self.pipe_log.new_log_file(LogQueue::Rewrite).unwrap(); + + let memtables = self + .memtables + .collect(|t| t.min_file_num(LogQueue::Rewrite).unwrap_or_default() > 0); + + let mut log_batch = LogBatch::::new(); + for memtable in memtables { + let region_id = memtable.rl().region_id(); + + let mut entries = Vec::new(); + fetch_entries( + &self.pipe_log, + &memtable, + &mut entries, + 0, + |t, ents, ents_idx| t.fetch_entries_from_rewrite(ents, ents_idx), + ) + .unwrap(); + log_batch.add_entries(region_id, entries); + + let mut kvs = Vec::new(); + memtable.rl().fetch_kvs_from_rewrite(&mut kvs); + for (k, v) in kvs { + log_batch.put(region_id, k, v); + } + } + self.rewrite_impl(&mut log_batch, None).unwrap(); + } } #[cfg(test)]