diff --git a/src/engine.rs b/src/engine.rs index 2b5f7bf4..36414dbf 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -155,7 +155,10 @@ fn apply_item( } } LogItemContent::Command(Command::Clean) => { - memtables.remove(item.raft_group_id); + if self.memtables.remove(item.raft_group_id).is_some() { + self.purge_manager + .remove_memtable(file_num, item.raft_group_id); + } } LogItemContent::Command(Command::Compact { index }) => { memtable.wl().compact_to(index); @@ -621,6 +624,17 @@ mod tests { use crate::util::ReadableSize; use raft::eraftpb::Entry; + impl Engine + where + E: Message + Clone, + W: EntryExt + 'static, + P: GenericPipeLog, + { + pub fn purge_manager(&self) -> &PurgeManager { + &self.purge_manager + } + } + type RaftLogEngine = Engine; impl RaftLogEngine { fn append(&self, raft_group_id: u64, entries: Vec) -> Result { @@ -734,7 +748,7 @@ mod tests { // GC all log entries. Won't trigger purge because total size is not enough. let count = engine.compact_to(1, 100); assert_eq!(count, 100); - assert!(!engine.purge_manager.needs_purge_log_files()); + assert!(!engine.purge_manager.needs_purge_log_files(LogQueue::Append)); // Append more logs to make total size greater than `purge_threshold`. for i in 100..250 { @@ -746,7 +760,7 @@ mod tests { let count = engine.compact_to(1, 101); assert_eq!(count, 1); // Needs to purge because the total size is greater than `purge_threshold`. - assert!(engine.purge_manager.needs_purge_log_files()); + assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append)); let old_min_file_num = engine.pipe_log.first_file_num(LogQueue::Append); let will_force_compact = engine.purge_expired_files().unwrap(); @@ -761,7 +775,7 @@ mod tests { let count = engine.compact_to(1, 102); assert_eq!(count, 1); // Needs to purge because the total size is greater than `purge_threshold`. - assert!(engine.purge_manager.needs_purge_log_files()); + assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append)); let old_min_file_num = engine.pipe_log.first_file_num(LogQueue::Append); let will_force_compact = engine.purge_expired_files().unwrap(); let new_min_file_num = engine.pipe_log.first_file_num(LogQueue::Append); @@ -841,7 +855,7 @@ mod tests { } // The engine needs purge, and all old entries should be rewritten. - assert!(engine.purge_manager.needs_purge_log_files()); + assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append)); assert!(engine.purge_expired_files().unwrap().is_empty()); assert!(engine.pipe_log.first_file_num(LogQueue::Append) > 1); @@ -877,7 +891,7 @@ mod tests { } } - assert!(engine.purge_manager.needs_purge_log_files()); + assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append)); assert!(engine.purge_expired_files().unwrap().is_empty()); let new_active_num = engine.pipe_log.active_file_num(LogQueue::Rewrite); @@ -887,4 +901,87 @@ mod tests { || (new_active_num == active_num && new_active_len > active_len) ); } + + // Raft groups can be removed when they only have entries in the rewrite queue. + // We need to ensure that these raft groups won't appear again after recover. + #[test] + fn test_clean_raft_with_rewrite() { + let dir = tempfile::Builder::new() + .prefix("test_clean_raft_with_rewrite") + .tempdir() + .unwrap(); + + let mut cfg = Config::default(); + cfg.dir = dir.path().to_str().unwrap().to_owned(); + cfg.target_file_size = ReadableSize::kb(128); + cfg.purge_threshold = ReadableSize::kb(512); + let engine = RaftLogEngine::new(cfg.clone()); + + let mut entry = Entry::new(); + entry.set_data(vec![b'x'; 1024]); + + // Layout of region 1 in file 1: + // entries[1..10], Clean, entries[2..11] + for j in 1..=10 { + entry.set_index(j); + append_log(&engine, 1, &entry); + } + let mut log_batch = LogBatch::with_capacity(1); + log_batch.clean_region(1); + engine.write(&mut log_batch, false).unwrap(); + assert!(engine.memtables.get(1).is_none()); + + entry.set_data(vec![b'y'; 1024]); + for j in 2..=11 { + entry.set_index(j); + append_log(&engine, 1, &entry); + } + + assert_eq!(engine.pipe_log.active_file_num(LogQueue::Append), 1); + + // Put more raft logs to trigger purge. + for i in 2..64 { + for j in 1..=10 { + entry.set_index(j); + append_log(&engine, i, &entry); + } + } + + // The engine needs purge, and all old entries should be rewritten. + assert!(engine.purge_manager.needs_purge_log_files(LogQueue::Append)); + assert!(engine.purge_expired_files().unwrap().is_empty()); + assert!(engine.pipe_log.first_file_num(LogQueue::Append) > 1); + + // All entries of region 1 has been rewritten. + let memtable_1 = engine.memtables.get(1).unwrap(); + assert!(memtable_1.rl().max_file_num(LogQueue::Append).is_none()); + assert!(memtable_1.rl().kvs_max_file_num(LogQueue::Append).is_none()); + // Entries of region 1 after the clean command should be still valid. + for j in 2..=11 { + let entry_j = engine.get_entry(1, j).unwrap().unwrap(); + assert_eq!(entry_j.get_data(), entry.get_data()); + } + + // Clean the raft group again. + let mut log_batch = LogBatch::with_capacity(1); + log_batch.clean_region(1); + engine.write(&mut log_batch, false).unwrap(); + assert!(engine.memtables.get(1).is_none()); + + // Put more raft logs and then recover. + let active_file_num = engine.pipe_log.active_file_num(LogQueue::Append); + for i in 64..=128 { + for j in 1..=10 { + entry.set_index(j); + append_log(&engine, i, &entry); + } + } + assert!(engine.purge_expired_files().unwrap().is_empty()); + assert!(engine.pipe_log.first_file_num(LogQueue::Append) > active_file_num); + + // After the engine recovers, the removed raft group shouldn't appear again. + drop(engine); + let engine = RaftLogEngine::new(cfg.clone()); + assert!(engine.memtables.get(1).is_none()); + } } diff --git a/src/lib.rs b/src/lib.rs index f93cd962..9da8305a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,7 +29,7 @@ mod wal; use crate::pipe_log::PipeLog; -pub use self::config::Config; +pub use self::config::{Config, RecoveryMode}; pub use self::errors::{Error, Result}; pub use self::log_batch::{EntryExt, LogBatch}; pub type RaftLogEngine = self::engine::Engine; @@ -46,6 +46,11 @@ pub struct GlobalStats { cache_hit: AtomicUsize, cache_miss: AtomicUsize, cache_size: AtomicUsize, + + // How many operations in the rewrite queue. + rewrite_operations: AtomicUsize, + // How many compacted operations in the rewrite queue. + compacted_rewrite_operations: AtomicUsize, } impl GlobalStats { @@ -80,6 +85,20 @@ impl GlobalStats { } } + pub fn add_rewrite(&self, count: usize) { + self.rewrite_operations.fetch_add(count, Ordering::Release); + } + pub fn add_compacted_rewrite(&self, count: usize) { + self.compacted_rewrite_operations + .fetch_add(count, Ordering::Release); + } + pub fn rewrite_operations(&self) -> usize { + self.rewrite_operations.load(Ordering::Acquire) + } + pub fn compacted_rewrite_operations(&self) -> usize { + self.compacted_rewrite_operations.load(Ordering::Acquire) + } + #[cfg(test)] pub fn reset_cache(&self) { self.cache_hit.store(0, Ordering::Relaxed); diff --git a/src/memtable.rs b/src/memtable.rs index 3a9fabc0..d4f10f16 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -94,6 +94,17 @@ impl> MemTable { distance } + fn adjust_rewrite_count(&mut self, new_rewrite_count: usize) { + let rewrite_delta = new_rewrite_count as i64 - self.rewrite_count as i64; + self.rewrite_count = new_rewrite_count; + if rewrite_delta >= 0 { + self.global_stats.add_rewrite(rewrite_delta as usize); + } else { + let rewrite_delta = -rewrite_delta as usize; + self.global_stats.add_compacted_rewrite(rewrite_delta); + } + } + // Remove all cached entries with index greater than or equal to the given. fn cut_entries_cache(&mut self, index: u64) { if self.entries_cache.is_empty() { @@ -124,9 +135,9 @@ impl> MemTable { } // Remove all entry indexes with index greater than or equal to the given. - fn cut_entries_index(&mut self, index: u64) { + fn cut_entries_index(&mut self, index: u64) -> usize { if self.entries_index.is_empty() { - return; + return 0; } let last_index = self.entries_index.back().unwrap().index; let first_index = self.entries_index.front().unwrap().index; @@ -134,10 +145,13 @@ impl> MemTable { let conflict = if index <= last_index { (index - first_index) as usize } else { - return; + return 0; }; self.entries_index.truncate(conflict); - self.rewrite_count = cmp::min(self.rewrite_count, self.entries_index.len()); + + let new_rewrite_count = cmp::min(self.rewrite_count, self.entries_index.len()); + self.adjust_rewrite_count(new_rewrite_count); + (last_index - index + 1) as usize } fn shrink_entries_cache(&mut self) { @@ -218,57 +232,123 @@ impl> MemTable { ei.queue = LogQueue::Rewrite; } self.append(entries, entries_index); - self.rewrite_count = self.entries_index.len(); + + let new_rewrite_count = self.entries_index.len(); + self.adjust_rewrite_count(new_rewrite_count); } - pub fn rewrite(&mut self, entries_index: Vec, latest_rewrite: u64) { - if entries_index.is_empty() || self.entries_index.is_empty() { - return; + pub fn rewrite(&mut self, entries_index: Vec, latest_rewrite: Option) { + if !entries_index.is_empty() { + self.global_stats.add_rewrite(entries_index.len()); + let compacted_rewrite = self.rewrite_impl(entries_index, latest_rewrite); + self.global_stats.add_compacted_rewrite(compacted_rewrite); } + } - let first = entries_index.first().unwrap().index; - let front = self.entries_index[self.rewrite_count].index; - assert!(front >= first); - let distance = (front - first) as usize; - if distance >= entries_index.len() { - return; + // Try to rewrite some entries which have already been written into the rewrite queue. + // However some are not necessary any more, so that return `compacted_rewrite_operations`. + fn rewrite_impl( + &mut self, + mut entries_index: Vec, + latest_rewrite: Option, + ) -> usize { + if self.entries_index.is_empty() { + // All entries are compacted. + return entries_index.len(); } - let last = entries_index.last().unwrap().index; + let mut compacted_rewrite_operations = 0; + + let front = self.entries_index.front().unwrap().index; let back = self.entries_index.back().unwrap().index; - let len = (cmp::min(last, back) - entries_index[distance].index + 1) as usize; + let mut first = entries_index.first().unwrap().index; + debug_assert!(first <= self.entries_index[self.rewrite_count].index); + let last = entries_index.last().unwrap().index; + + if last < front { + // All rewritten entries are compacted. + return entries_index.len(); + } + + if first < front { + // Some of head entries in `entries_index` are compacted. + let offset = (front - first) as usize; + entries_index.drain(..offset); + first = entries_index.first().unwrap().index; + compacted_rewrite_operations += offset; + } + + let distance = (first - front) as usize; + let len = (cmp::min(last, back) - first + 1) as usize; + for (i, ei) in entries_index.iter().take(len).enumerate() { + if let Some(latest_rewrite) = latest_rewrite { + debug_assert_eq!(self.entries_index[i + distance].queue, LogQueue::Append); + if self.entries_index[i + distance].file_num > latest_rewrite { + // Some entries are overwritten by new appends. + compacted_rewrite_operations += entries_index.len() - i; + break; + } + } else { + // It's a squeeze operation. + debug_assert_eq!(self.entries_index[i + distance].queue, LogQueue::Rewrite); + } - for ei in entries_index.iter().skip(distance).take(len) { - if self.entries_index[self.rewrite_count].file_num > latest_rewrite { - // Some entries are overwritten by new appends. - break; + if self.entries_index[i + distance].queue != LogQueue::Rewrite { + debug_assert_eq!(ei.queue, LogQueue::Rewrite); + self.entries_index[i + distance].queue = LogQueue::Rewrite; + self.rewrite_count += 1; } - self.entries_index[self.rewrite_count].queue = ei.queue; - self.entries_index[self.rewrite_count].file_num = ei.file_num; - self.entries_index[self.rewrite_count].base_offset = ei.base_offset; - self.rewrite_count += 1; + + self.entries_index[i + distance].file_num = ei.file_num; + self.entries_index[i + distance].base_offset = ei.base_offset; + self.entries_index[i + distance].compression_type = ei.compression_type; + self.entries_index[i + distance].batch_len = ei.batch_len; } + compacted_rewrite_operations } pub fn put(&mut self, key: Vec, value: Vec, file_num: u64) { - self.kvs.insert(key, (value, LogQueue::Append, file_num)); + if let Some(origin) = self.kvs.insert(key, (value, LogQueue::Append, file_num)) { + if origin.1 == LogQueue::Rewrite { + self.global_stats.add_compacted_rewrite(1); + } + } } - pub fn rewrite_key(&mut self, key: Vec, latest_rewrite: u64, file_num: u64) { + pub fn rewrite_key(&mut self, key: Vec, latest_rewrite: Option, file_num: u64) { + self.global_stats.add_rewrite(1); if let Some(value) = self.kvs.get_mut(&key) { - if value.2 <= latest_rewrite { - value.1 = LogQueue::Rewrite; + if value.1 == LogQueue::Append { + if let Some(latest_rewrite) = latest_rewrite { + if value.2 <= latest_rewrite { + value.1 = LogQueue::Rewrite; + value.2 = file_num; + } + } else { + // The rewritten key/value pair has been overwritten. + self.global_stats.add_compacted_rewrite(1); + } + } else { + assert!(value.2 <= file_num); value.2 = file_num; } + } else { + // The rewritten key/value pair has been compacted. + self.global_stats.add_compacted_rewrite(1); } } pub fn put_rewrite(&mut self, key: Vec, value: Vec, file_num: u64) { self.kvs.insert(key, (value, LogQueue::Rewrite, file_num)); + self.global_stats.add_rewrite(1); } pub fn delete(&mut self, key: &[u8]) { - self.kvs.remove(key); + if let Some(value) = self.kvs.remove(key) { + if value.1 == LogQueue::Rewrite { + self.global_stats.add_compacted_rewrite(1); + } + } } pub fn get(&self, key: &[u8]) -> Option> { @@ -288,7 +368,9 @@ impl> MemTable { let drain_end = (idx - first_idx) as usize; self.entries_index.drain(..drain_end); self.shrink_entries_index(); - self.rewrite_count -= cmp::min(drain_end, self.rewrite_count); + + let rewrite_sub = cmp::min(drain_end, self.rewrite_count); + self.adjust_rewrite_count(self.rewrite_count - rewrite_sub); drain_end as u64 } @@ -431,6 +513,19 @@ impl> MemTable { Ok(()) } + pub fn fetch_entries_from_rewrite( + &self, + vec: &mut Vec, + vec_idx: &mut Vec, + ) -> Result<()> { + if self.rewrite_count > 0 { + let end = self.entries_index[self.rewrite_count].index; + let first = self.entries_index.front().unwrap().index; + return self.fetch_entries_to(first, end, None, vec, vec_idx); + } + Ok(()) + } + pub fn fetch_rewrite_kvs(&self, latest_rewrite: u64, vec: &mut Vec<(Vec, Vec)>) { for (key, (value, queue, file_num)) in &self.kvs { if *queue == LogQueue::Append && *file_num <= latest_rewrite { @@ -439,6 +534,14 @@ impl> MemTable { } } + pub fn fetch_kvs_from_rewrite(&self, vec: &mut Vec<(Vec, Vec)>) { + for (key, (value, queue, _)) in &self.kvs { + if *queue == LogQueue::Rewrite { + vec.push((key.clone(), value.clone())); + } + } + } + pub fn min_file_num(&self, queue: LogQueue) -> Option { let entry = match queue { LogQueue::Append => self.entries_index.get(self.rewrite_count), @@ -894,6 +997,50 @@ mod tests { assert_eq!(stats.cache_hit(), 1); } + #[test] + fn test_memtable_fetch_rewrite() { + let region_id = 8; + let cache_limit = 0; + let stats = Arc::new(GlobalStats::default()); + let mut memtable = MemTable::::new(region_id, cache_limit, stats.clone()); + + // After appending: + // [0, 10) file_num = 1 + // [10, 20) file_num = 2 + // [20, 30) file_num = 3 + memtable.append(generate_ents(0, 10), generate_ents_index(0, 10, 1)); + memtable.put(b"k1".to_vec(), b"v1".to_vec(), 1); + memtable.append(generate_ents(10, 20), generate_ents_index(10, 20, 2)); + memtable.put(b"k2".to_vec(), b"v2".to_vec(), 2); + memtable.append(generate_ents(20, 25), generate_ents_index(20, 25, 3)); + memtable.put(b"k3".to_vec(), b"v3".to_vec(), 3); + + // After rewriting: + // [0, 10) queue = rewrite, file_num = 50, + // [10, 20) file_num = 2 + // [20, 30) file_num = 3 + memtable.rewrite(generate_rewrite_ents_index(0, 10, 50), Some(1)); + memtable.rewrite_key(b"k1".to_vec(), Some(1), 50); + + let (mut ents, mut ents_idx) = (vec![], vec![]); + + assert!(memtable + .fetch_rewrite_entries(2, &mut ents, &mut ents_idx) + .is_ok()); + assert_eq!(ents_idx.len(), 10); + assert_eq!(ents_idx.first().unwrap().index, 10); + assert_eq!(ents_idx.last().unwrap().index, 19); + + ents.clear(); + ents_idx.clear(); + assert!(memtable + .fetch_entries_from_rewrite(&mut ents, &mut ents_idx) + .is_ok()); + assert_eq!(ents_idx.len(), 10); + assert_eq!(ents_idx.first().unwrap().index, 0); + assert_eq!(ents_idx.last().unwrap().index, 9); + } + #[test] fn test_memtable_kv_operations() { let region_id = 8; @@ -955,7 +1102,7 @@ mod tests { memtable.append(generate_ents(20, 30), generate_ents_index(20, 30, 3)); memtable.put(b"kk2".to_vec(), b"vv2".to_vec(), 3); memtable.append(generate_ents(30, 40), generate_ents_index(30, 40, 4)); - memtable.put(b"kk2".to_vec(), b"vv3".to_vec(), 4); + memtable.put(b"kk3".to_vec(), b"vv3".to_vec(), 4); assert_eq!(memtable.cache_size, 15); assert_eq!(memtable.entries_size(), 30); @@ -964,75 +1111,97 @@ mod tests { memtable.check_entries_index_and_cache(); // Rewrite compacted entries. - memtable.rewrite(generate_rewrite_ents_index(1, 10, 50), 1); - memtable.rewrite_key(b"kk0".to_vec(), 1, 50); + memtable.rewrite(generate_rewrite_ents_index(0, 10, 50), Some(1)); + memtable.rewrite_key(b"kk0".to_vec(), Some(1), 50); assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 2); assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 4); assert!(memtable.min_file_num(LogQueue::Rewrite).is_none()); assert!(memtable.max_file_num(LogQueue::Rewrite).is_none()); assert_eq!(memtable.rewrite_count, 0); assert_eq!(memtable.get(b"kk0"), None); + assert_eq!(memtable.global_stats.rewrite_operations(), 11); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 11); // Rewrite compacted entries + valid entries. - memtable.rewrite(generate_rewrite_ents_index(1, 20, 100), 2); - memtable.rewrite_key(b"kk1".to_vec(), 2, 100); + memtable.rewrite(generate_rewrite_ents_index(0, 20, 100), Some(2)); + memtable.rewrite_key(b"kk1".to_vec(), Some(2), 100); assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 3); assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 4); assert_eq!(memtable.min_file_num(LogQueue::Rewrite).unwrap(), 100); assert_eq!(memtable.max_file_num(LogQueue::Rewrite).unwrap(), 100); assert_eq!(memtable.rewrite_count, 10); assert_eq!(memtable.get(b"kk1"), Some(b"vv1".to_vec())); + assert_eq!(memtable.global_stats.rewrite_operations(), 32); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 21); // Rewrite vaild entries, some of them are in cache. - memtable.rewrite(generate_rewrite_ents_index(20, 30, 101), 3); - memtable.rewrite_key(b"kk2".to_vec(), 3, 101); + memtable.rewrite(generate_rewrite_ents_index(20, 30, 101), Some(3)); + memtable.rewrite_key(b"kk2".to_vec(), Some(3), 101); assert_eq!(memtable.cache_size, 15); assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 4); assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 4); assert_eq!(memtable.min_file_num(LogQueue::Rewrite).unwrap(), 100); assert_eq!(memtable.max_file_num(LogQueue::Rewrite).unwrap(), 101); assert_eq!(memtable.rewrite_count, 20); - assert_eq!(memtable.get(b"kk2"), Some(b"vv3".to_vec())); + assert_eq!(memtable.get(b"kk2"), Some(b"vv2".to_vec())); + assert_eq!(memtable.global_stats.rewrite_operations(), 43); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 21); // Rewrite valid + overwritten entries. memtable.append(generate_ents(35, 36), generate_ents_index(35, 36, 5)); memtable.put(b"kk2".to_vec(), b"vv4".to_vec(), 5); assert_eq!(memtable.cache_size, 11); assert_eq!(memtable.entries_index.back().unwrap().index, 35); - memtable.rewrite(generate_rewrite_ents_index(30, 40, 102), 4); - memtable.rewrite_key(b"kk2".to_vec(), 4, 102); + assert_eq!(memtable.global_stats.rewrite_operations(), 43); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 22); + memtable.rewrite(generate_rewrite_ents_index(30, 40, 102), Some(4)); + memtable.rewrite_key(b"kk3".to_vec(), Some(4), 102); assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 5); assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 5); assert_eq!(memtable.min_file_num(LogQueue::Rewrite).unwrap(), 100); assert_eq!(memtable.max_file_num(LogQueue::Rewrite).unwrap(), 102); assert_eq!(memtable.rewrite_count, 25); assert_eq!(memtable.get(b"kk2"), Some(b"vv4".to_vec())); + assert_eq!(memtable.global_stats.rewrite_operations(), 54); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 27); // Rewrite after compact. memtable.append(generate_ents(36, 50), generate_ents_index(36, 50, 6)); memtable.compact_to(30); assert_eq!(memtable.rewrite_count, 5); + assert_eq!(memtable.global_stats.rewrite_operations(), 54); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 47); memtable.compact_to(40); assert_eq!(memtable.rewrite_count, 0); + assert_eq!(memtable.global_stats.rewrite_operations(), 54); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 52); assert_eq!(memtable.cache_size, 10); assert_eq!(memtable.entries_index.back().unwrap().index, 49); - memtable.rewrite(generate_rewrite_ents_index(30, 36, 103), 5); - memtable.rewrite_key(b"kk2".to_vec(), 5, 103); + memtable.rewrite(generate_rewrite_ents_index(30, 36, 103), Some(5)); + memtable.rewrite_key(b"kk2".to_vec(), Some(5), 103); assert_eq!(memtable.min_file_num(LogQueue::Append).unwrap(), 6); assert_eq!(memtable.max_file_num(LogQueue::Append).unwrap(), 6); assert_eq!(memtable.min_file_num(LogQueue::Rewrite).unwrap(), 100); assert_eq!(memtable.max_file_num(LogQueue::Rewrite).unwrap(), 103); assert_eq!(memtable.rewrite_count, 0); + assert_eq!(memtable.global_stats.rewrite_operations(), 61); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 58); // Rewrite after cut. memtable.append(generate_ents(50, 55), generate_ents_index(50, 55, 7)); - memtable.rewrite(generate_rewrite_ents_index(30, 50, 104), 6); + memtable.rewrite(generate_rewrite_ents_index(30, 50, 104), Some(6)); assert_eq!(memtable.rewrite_count, 10); + assert_eq!(memtable.global_stats.rewrite_operations(), 81); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 68); memtable.append(generate_ents(45, 50), generate_ents_index(45, 50, 7)); assert_eq!(memtable.rewrite_count, 5); + assert_eq!(memtable.global_stats.rewrite_operations(), 81); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 73); memtable.append(generate_ents(40, 50), generate_ents_index(40, 50, 7)); assert_eq!(memtable.rewrite_count, 0); + assert_eq!(memtable.global_stats.rewrite_operations(), 81); + assert_eq!(memtable.global_stats.compacted_rewrite_operations(), 78); } fn generate_ents(begin_idx: u64, end_idx: u64) -> Vec { diff --git a/src/pipe_log.rs b/src/pipe_log.rs index e4b498d3..4a2a8e03 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -62,6 +62,8 @@ pub trait GenericPipeLog: Sized + Clone + Send { /// Truncate the active log file of `queue`. fn truncate_active_log(&self, queue: LogQueue, offset: Option) -> Result<()>; + fn new_log_file(&self, queue: LogQueue) -> Result<()>; + /// Sync the given queue. fn sync(&self, queue: LogQueue) -> Result<()>; @@ -443,6 +445,10 @@ impl GenericPipeLog for PipeLog { self.mut_queue(queue).truncate_active_log(offset) } + fn new_log_file(&self, queue: LogQueue) -> Result<()> { + self.mut_queue(queue).new_log_file() + } + fn sync(&self, queue: LogQueue) -> Result<()> { if let Some(fd) = self.get_queue(queue).get_active_fd() { fd.sync()?; diff --git a/src/purge.rs b/src/purge.rs index 5ad0fa21..18798ee6 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -1,15 +1,16 @@ -use std::cmp; -use std::sync::Arc; +use std::cmp::{self, Reverse}; +use std::collections::BinaryHeap; +use std::sync::{Arc, Mutex}; use log::info; use protobuf::Message; use crate::config::Config; use crate::engine::{fetch_entries, MemTableAccessor}; -use crate::log_batch::{EntryExt, LogBatch, LogItemContent, OpType}; +use crate::log_batch::{Command, EntryExt, LogBatch, LogItemContent, OpType}; use crate::pipe_log::{GenericPipeLog, LogQueue}; use crate::util::HandyRwLock; -use crate::Result; +use crate::{GlobalStats, Result}; // If a region has some very old raft logs less than this threshold, // rewrite them to clean stale log files ASAP. @@ -27,6 +28,14 @@ where cfg: Arc, memtables: MemTableAccessor, pipe_log: P, + global_stats: Arc, + + // Vector of (file_num, raft_group_id). + #[allow(clippy::type_complexity)] + removed_memtables: Arc, u64)>>>, + + // Only one thread can run `purge_expired_files` at a time. + purge_mutex: Arc>, } impl PurgeManager @@ -39,16 +48,25 @@ where cfg: Arc, memtables: MemTableAccessor, pipe_log: P, + global_stats: Arc, ) -> PurgeManager { PurgeManager { cfg, memtables, pipe_log, + global_stats, + removed_memtables: Default::default(), + purge_mutex: Arc::new(Mutex::new(())), } } pub fn purge_expired_files(&self) -> Result> { - if !self.needs_purge_log_files() { + let _purge_mutex = match self.purge_mutex.try_lock() { + Ok(context) => context, + _ => return Ok(vec![]), + }; + + if !self.needs_purge_log_files(LogQueue::Append) { return Ok(vec![]); } @@ -60,19 +78,46 @@ where &mut will_force_compact, ); - let min_file_num = self.memtables.fold(u64::MAX, |min, t| { - cmp::min(min, t.min_file_num(LogQueue::Append).unwrap_or(u64::MAX)) - }); - let purged = self.pipe_log.purge_to(LogQueue::Append, min_file_num)?; - info!("purged {} expired log files", purged); + if self.rewrite_queue_needs_squeeze() && self.needs_purge_log_files(LogQueue::Rewrite) { + self.squeeze_rewrite_queue(); + } + + let (min_file_1, min_file_2) = + self.memtables + .fold((u64::MAX, u64::MAX), |(mut min1, mut min2), t| { + min1 = cmp::min(min1, t.min_file_num(LogQueue::Append).unwrap_or(u64::MAX)); + min2 = cmp::min(min2, t.min_file_num(LogQueue::Rewrite).unwrap_or(u64::MAX)); + (min1, min2) + }); + let purged_1 = self.pipe_log.purge_to(LogQueue::Append, min_file_1)?; + info!("purged {} expired log files", purged_1); + + if min_file_2 < self.pipe_log.active_file_num(LogQueue::Rewrite) { + let purged_2 = self.pipe_log.purge_to(LogQueue::Rewrite, min_file_2)?; + info!("purged {} expired rewrite files", purged_2); + } Ok(will_force_compact) } - pub fn needs_purge_log_files(&self) -> bool { - let total_size = self.pipe_log.total_size(LogQueue::Append); + pub fn needs_purge_log_files(&self, queue: LogQueue) -> bool { + let active_file_num = self.pipe_log.active_file_num(queue); + let first_file_num = self.pipe_log.first_file_num(queue); + if active_file_num == first_file_num { + return false; + } + + let total_size = self.pipe_log.total_size(queue); let purge_threshold = self.cfg.purge_threshold.0 as usize; - total_size > purge_threshold + match queue { + LogQueue::Append => total_size > purge_threshold, + LogQueue::Rewrite => total_size * 10 > purge_threshold, + } + } + + pub fn remove_memtable(&self, file_num: u64, raft_group_id: u64) { + let mut tables = self.removed_memtables.lock().unwrap(); + tables.push((Reverse(file_num), raft_group_id)); } // Returns (`latest_needs_rewrite`, `latest_needs_force_compact`). @@ -96,6 +141,15 @@ where (latest_needs_rewrite, latest_needs_compact) } + fn rewrite_queue_needs_squeeze(&self) -> bool { + // Squeeze the rewrite queue if its garbage ratio reaches 50%. + let rewrite_operations = self.global_stats.rewrite_operations(); + let compacted_rewrite_operations = self.global_stats.compacted_rewrite_operations(); + compacted_rewrite_operations as f64 / rewrite_operations as f64 > 0.5 + } + + // FIXME: We need to ensure that all operations before `latest_rewrite` (included) are written + // into memtables. fn regions_rewrite_or_force_compact( &self, latest_rewrite: u64, @@ -103,6 +157,17 @@ where will_force_compact: &mut Vec, ) { assert!(latest_compact <= latest_rewrite); + let mut log_batch = LogBatch::::new(); + + while let Some(item) = self.removed_memtables.lock().unwrap().pop() { + let (file_num, raft_id) = ((item.0).0, item.1); + if file_num > latest_rewrite { + self.removed_memtables.lock().unwrap().push(item); + break; + } + log_batch.clean_region(raft_id); + } + let memtables = self.memtables.collect(|t| { let min_file_num = t.min_file_num(LogQueue::Append).unwrap_or(u64::MAX); let count = t.entries_count(); @@ -126,21 +191,23 @@ where |t, ents, ents_idx| t.fetch_rewrite_entries(latest_rewrite, ents, ents_idx), ) .unwrap(); + log_batch.add_entries(region_id, entries); let mut kvs = Vec::new(); memtable.rl().fetch_rewrite_kvs(latest_rewrite, &mut kvs); - - let mut log_batch = LogBatch::::new(); - log_batch.add_entries(region_id, entries); for (k, v) in kvs { log_batch.put(region_id, k, v); } - - self.rewrite_impl(&mut log_batch, latest_rewrite).unwrap(); } + self.rewrite_impl(&mut log_batch, Some(latest_rewrite)) + .unwrap(); } - fn rewrite_impl(&self, log_batch: &mut LogBatch, latest_rewrite: u64) -> Result<()> { + fn rewrite_impl( + &self, + log_batch: &mut LogBatch, + latest_rewrite: Option, + ) -> Result<()> { let mut file_num = 0; self.pipe_log.rewrite(log_batch, true, &mut file_num)?; if file_num > 0 { @@ -154,7 +221,7 @@ where &self, log_batch: &mut LogBatch, file_num: u64, - latest_rewrite: u64, + latest_rewrite: Option, ) { for item in log_batch.items.drain(..) { let memtable = self.memtables.get_or_insert(item.raft_group_id); @@ -167,8 +234,78 @@ where OpType::Put => memtable.wl().rewrite_key(kv.key, latest_rewrite, file_num), _ => unreachable!(), }, + LogItemContent::Command(Command::Clean) => { + // Nothing need to do. + } _ => unreachable!(), } } } + + fn squeeze_rewrite_queue(&self) { + self.pipe_log.new_log_file(LogQueue::Rewrite).unwrap(); + + let memtables = self + .memtables + .collect(|t| t.min_file_num(LogQueue::Rewrite).unwrap_or_default() > 0); + + let mut log_batch = LogBatch::::new(); + for memtable in memtables { + let region_id = memtable.rl().region_id(); + + let mut entries = Vec::new(); + fetch_entries( + &self.pipe_log, + &memtable, + &mut entries, + 0, + |t, ents, ents_idx| t.fetch_entries_from_rewrite(ents, ents_idx), + ) + .unwrap(); + log_batch.add_entries(region_id, entries); + + let mut kvs = Vec::new(); + memtable.rl().fetch_kvs_from_rewrite(&mut kvs); + for (k, v) in kvs { + log_batch.put(region_id, k, v); + } + } + self.rewrite_impl(&mut log_batch, None).unwrap(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::engine::Engine; + use crate::pipe_log::PipeLog; + use raft::eraftpb::Entry; + + type RaftLogEngine = Engine; + + #[test] + fn test_remove_memtable() { + let dir = tempfile::Builder::new() + .prefix("test_remove_memtable") + .tempdir() + .unwrap(); + + let mut cfg = Config::default(); + cfg.dir = dir.path().to_str().unwrap().to_owned(); + let engine = RaftLogEngine::new(cfg.clone()); + + engine.purge_manager().remove_memtable(3, 10); + engine.purge_manager().remove_memtable(3, 9); + engine.purge_manager().remove_memtable(3, 11); + engine.purge_manager().remove_memtable(2, 9); + engine.purge_manager().remove_memtable(4, 4); + engine.purge_manager().remove_memtable(4, 3); + + let mut tables = engine.purge_manager().removed_memtables.lock().unwrap(); + for (file_num, raft_id) in vec![(2, 9), (3, 11), (3, 10), (3, 9), (4, 4), (4, 3)] { + let item = tables.pop().unwrap(); + assert_eq!((item.0).0, file_num); + assert_eq!(item.1, raft_id); + } + } }