diff --git a/src/engine.rs b/src/engine.rs index 36414dbf..3ed673d9 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -22,7 +22,7 @@ use crate::log_batch::{ }; use crate::memtable::{EntryIndex, MemTable}; use crate::pipe_log::{GenericPipeLog, LogQueue, PipeLog, FILE_MAGIC_HEADER, VERSION}; -use crate::purge::PurgeManager; +use crate::purge::{PurgeManager, RemovedMemtables}; use crate::util::{HandyRwLock, HashMap, Worker}; use crate::wal::{LogMsg, WalRunner, WriteTask}; use crate::{codec, CacheStats, GlobalStats, Result}; @@ -136,6 +136,7 @@ where fn apply_item( memtables: &MemTableAccessor, + removed_memtables: &RemovedMemtables, item: LogItem, queue: LogQueue, file_num: u64, @@ -155,9 +156,8 @@ fn apply_item( } } LogItemContent::Command(Command::Clean) => { - if self.memtables.remove(item.raft_group_id).is_some() { - self.purge_manager - .remove_memtable(file_num, item.raft_group_id); + if memtables.remove(item.raft_group_id).is_some() { + removed_memtables.remove_memtable(file_num, item.raft_group_id); } } LogItemContent::Command(Command::Compact { index }) => { @@ -183,8 +183,9 @@ where P: GenericPipeLog, { fn apply_to_memtable(&self, log_batch: &mut LogBatch, queue: LogQueue, file_num: u64) { + let removed = self.purge_manager.get_removed_memtables(); for item in log_batch.items.drain(..) { - apply_item(&self.memtables, item, queue, file_num); + apply_item(&self.memtables, &removed, item, queue, file_num); } } @@ -208,6 +209,7 @@ where } let memtables = self.memtables.clone(); let items = std::mem::replace(&mut log_batch.items, vec![]); + let removed_memtables = self.purge_manager.get_removed_memtables(); return Box::pin(async move { let (file_num, offset, tracker) = r.await?; if file_num > 0 { @@ -215,7 +217,13 @@ where if let LogItemContent::Entries(entries) = &mut item.content { entries.update_position(LogQueue::Append, file_num, offset, &tracker); } - apply_item(&memtables, item, LogQueue::Append, file_num); + apply_item( + &memtables, + &removed_memtables, + item, + LogQueue::Append, + file_num, + ); } } return Ok(bytes); @@ -282,7 +290,12 @@ where ); let cfg = Arc::new(cfg); - let purge_manager = PurgeManager::new(cfg.clone(), memtables.clone(), pipe_log.clone()); + let purge_manager = PurgeManager::new( + cfg.clone(), + memtables.clone(), + pipe_log.clone(), + global_stats.clone(), + ); let (wal_sender, wal_receiver) = channel(); let engine = Engine { cfg, diff --git a/src/log_batch.rs b/src/log_batch.rs index 9fc2eae3..edd4e32c 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -697,7 +697,7 @@ mod tests { ), ]; - for item in items { + for mut item in items { let mut encoded = vec![]; item.encode_to::(&mut encoded).unwrap(); let mut s = encoded.as_slice(); @@ -728,9 +728,7 @@ mod tests { assert_eq!(batch, decoded_batch); match &decoded_batch.items[0].content { - LogItemContent::Entries(entries) => { - assert_eq!(entries.encoded_size.get(), encoded_size) - } + LogItemContent::Entries(entries) => assert_eq!(entries.encoded_size, encoded_size), _ => unreachable!(), } } diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 4a2a8e03..3a5809b0 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -760,7 +760,7 @@ mod tests { fn write_to_log( log: &mut PipeLog, submitor: &mut CacheSubmitor, - batch: &LogBatch, + batch: &mut LogBatch, file_num: &mut u64, ) { let mut entries_size = 0; @@ -770,8 +770,8 @@ mod tests { let offset = log.append(LogQueue::Append, &content).unwrap(); let tracker = submitor.get_cache_tracker(cur_file_num, offset); submitor.fill_chunk(entries_size); - for item in &batch.items { - if let LogItemContent::Entries(ref entries) = item.content { + for item in &mut batch.items { + if let LogItemContent::Entries(entries) = &mut item.content { entries.update_position(LogQueue::Append, cur_file_num, offset, &tracker); } } @@ -809,9 +809,9 @@ mod tests { // After 4 batches are written into pipe log, no `CacheTask::NewChunk` // task should be triggered. However the last batch will trigger it. for i in 0..5 { - let log_batch = get_1m_batch(); + let mut log_batch = get_1m_batch(); let mut file_num = 0; - write_to_log(&mut pipe_log, &mut submitor, &log_batch, &mut file_num); + write_to_log(&mut pipe_log, &mut submitor, &mut log_batch, &mut file_num); log_batches.push(log_batch); let x = receiver.recv_timeout(Duration::from_millis(100)); if i < 4 { @@ -824,9 +824,9 @@ mod tests { // Write more 2 batches into pipe log. A `CacheTask::NewChunk` will be // emit on the second batch because log file is switched. for i in 5..7 { - let log_batch = get_1m_batch(); + let mut log_batch = get_1m_batch(); let mut file_num = 0; - write_to_log(&mut pipe_log, &mut submitor, &log_batch, &mut file_num); + write_to_log(&mut pipe_log, &mut submitor, &mut log_batch, &mut file_num); log_batches.push(log_batch); let x = receiver.recv_timeout(Duration::from_millis(100)); if i < 6 { @@ -840,9 +840,9 @@ mod tests { // `CacheTracker`s accociated in `EntryIndex`s are droped. drop(log_batches); for _ in 7..20 { - let log_batch = get_1m_batch(); + let mut log_batch = get_1m_batch(); let mut file_num = 0; - write_to_log(&mut pipe_log, &mut submitor, &log_batch, &mut file_num); + write_to_log(&mut pipe_log, &mut submitor, &mut log_batch, &mut file_num); drop(log_batch); assert!(receiver.recv_timeout(Duration::from_millis(100)).is_err()); } diff --git a/src/purge.rs b/src/purge.rs index 18798ee6..c6b56fcb 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -12,6 +12,16 @@ use crate::pipe_log::{GenericPipeLog, LogQueue}; use crate::util::HandyRwLock; use crate::{GlobalStats, Result}; +#[derive(Clone, Default)] +pub struct RemovedMemtables(Arc, u64)>>>); + +impl RemovedMemtables { + pub fn remove_memtable(&self, file_num: u64, raft_group_id: u64) { + let mut tables = self.0.lock().unwrap(); + tables.push((Reverse(file_num), raft_group_id)); + } +} + // If a region has some very old raft logs less than this threshold, // rewrite them to clean stale log files ASAP. const REWRITE_ENTRY_COUNT_THRESHOLD: usize = 32; @@ -31,8 +41,7 @@ where global_stats: Arc, // Vector of (file_num, raft_group_id). - #[allow(clippy::type_complexity)] - removed_memtables: Arc, u64)>>>, + removed_memtables: RemovedMemtables, // Only one thread can run `purge_expired_files` at a time. purge_mutex: Arc>, @@ -115,9 +124,8 @@ where } } - pub fn remove_memtable(&self, file_num: u64, raft_group_id: u64) { - let mut tables = self.removed_memtables.lock().unwrap(); - tables.push((Reverse(file_num), raft_group_id)); + pub fn get_removed_memtables(&self) -> RemovedMemtables { + self.removed_memtables.clone() } // Returns (`latest_needs_rewrite`, `latest_needs_force_compact`). @@ -159,13 +167,16 @@ where assert!(latest_compact <= latest_rewrite); let mut log_batch = LogBatch::::new(); - while let Some(item) = self.removed_memtables.lock().unwrap().pop() { - let (file_num, raft_id) = ((item.0).0, item.1); - if file_num > latest_rewrite { - self.removed_memtables.lock().unwrap().push(item); - break; + { + let mut guard = self.removed_memtables.0.lock().unwrap(); + while let Some(item) = guard.pop() { + let (file_num, raft_id) = ((item.0).0, item.1); + if file_num > latest_rewrite { + guard.push(item); + break; + } + log_batch.clean_region(raft_id); } - log_batch.clean_region(raft_id); } let memtables = self.memtables.collect(|t| { @@ -216,7 +227,6 @@ where Ok(()) } - fn rewrite_to_memtable( &self, log_batch: &mut LogBatch, @@ -227,7 +237,7 @@ where let memtable = self.memtables.get_or_insert(item.raft_group_id); match item.content { LogItemContent::Entries(entries_to_add) => { - let entries_index = entries_to_add.entries_index.into_inner(); + let entries_index = entries_to_add.entries_index; memtable.wl().rewrite(entries_index, latest_rewrite); } LogItemContent::Kv(kv) => match kv.op_type { @@ -294,16 +304,17 @@ mod tests { cfg.dir = dir.path().to_str().unwrap().to_owned(); let engine = RaftLogEngine::new(cfg.clone()); - engine.purge_manager().remove_memtable(3, 10); - engine.purge_manager().remove_memtable(3, 9); - engine.purge_manager().remove_memtable(3, 11); - engine.purge_manager().remove_memtable(2, 9); - engine.purge_manager().remove_memtable(4, 4); - engine.purge_manager().remove_memtable(4, 3); + let tables = engine.purge_manager().get_removed_memtables(); + tables.remove_memtable(3, 10); + tables.remove_memtable(3, 9); + tables.remove_memtable(3, 11); + tables.remove_memtable(2, 9); + tables.remove_memtable(4, 4); + tables.remove_memtable(4, 3); - let mut tables = engine.purge_manager().removed_memtables.lock().unwrap(); + let mut guard = tables.0.lock().unwrap(); for (file_num, raft_id) in vec![(2, 9), (3, 11), (3, 10), (3, 9), (4, 4), (4, 3)] { - let item = tables.pop().unwrap(); + let item = guard.pop().unwrap(); assert_eq!((item.0).0, file_num); assert_eq!(item.1, raft_id); }