diff --git a/src/cache_evict.rs b/src/cache_evict.rs index 566eb93c..fbd88fca 100644 --- a/src/cache_evict.rs +++ b/src/cache_evict.rs @@ -7,11 +7,11 @@ use std::sync::Arc; use crossbeam::channel::{bounded, Sender}; use protobuf::Message; -use crate::engine::{MemTableAccessor}; -use crate::GlobalStats; +use crate::engine::MemTableAccessor; use crate::log_batch::{EntryExt, LogBatch, LogItemContent}; use crate::pipe_log::{GenericPipeLog, LogQueue}; use crate::util::{HandyRwLock, Runnable, Scheduler}; +use crate::GlobalStats; pub const DEFAULT_CACHE_CHUNK_SIZE: usize = 4 * 1024 * 1024; diff --git a/src/engine.rs b/src/engine.rs index 26dca366..483b2c67 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -18,7 +18,7 @@ use crate::memtable::{EntryIndex, MemTable}; use crate::pipe_log::{GenericPipeLog, LogQueue, PipeLog, FILE_MAGIC_HEADER, VERSION}; use crate::purge::PurgeManager; use crate::util::{HandyRwLock, HashMap, Worker}; -use crate::{codec, GlobalStats, CacheStats, Result}; +use crate::{codec, CacheStats, GlobalStats, Result}; const SLOTS_COUNT: usize = 128; @@ -49,7 +49,9 @@ where E: Message + Clone, W: EntryExt, { - fn new(creator: Arc MemTable + Send + Sync>) -> MemTableAccessor { + pub fn new( + creator: Arc MemTable + Send + Sync>, + ) -> MemTableAccessor { let mut slots = Vec::with_capacity(SLOTS_COUNT); for _ in 0..SLOTS_COUNT { slots.push(Arc::new(RwLock::new(MemTables::default()))); @@ -74,11 +76,11 @@ where memtables.get(&raft_group_id).cloned() } - pub fn remove(&self, raft_group_id: u64) { + pub fn remove(&self, raft_group_id: u64) -> Option>>> { let mut memtables = self.slots[raft_group_id as usize % SLOTS_COUNT] .write() .unwrap(); - memtables.remove(&raft_group_id); + memtables.remove(&raft_group_id) } pub fn fold) -> B>(&self, mut init: B, fold: F) -> B { @@ -129,103 +131,45 @@ where W: EntryExt + 'static, P: GenericPipeLog, { - // Recover from disk. - fn recover( - queue: LogQueue, - pipe_log: &P, - memtables: &MemTableAccessor, - recovery_mode: RecoveryMode, - ) -> Result<()> { - // Get first file number and last file number. - let first_file_num = pipe_log.first_file_num(queue); - let active_file_num = pipe_log.active_file_num(queue); - - // Iterate and recover from files one by one. - let start = Instant::now(); - for file_num in first_file_num..=active_file_num { - // Read a file. - let content = pipe_log.read_whole_file(queue, file_num)?; - - // Verify file header. - let mut buf = content.as_slice(); - if !buf.starts_with(FILE_MAGIC_HEADER) { - if file_num != active_file_num { - warn!("Raft log header is corrupted at {:?}.{}", queue, file_num); - return Err(box_err!("Raft log file header is corrupted")); - } else { - pipe_log.truncate_active_log(queue, Some(0)).unwrap(); - break; - } - } - - // Iterate all LogBatch in one file. - let start_ptr = buf.as_ptr(); - buf.consume(FILE_MAGIC_HEADER.len() + VERSION.len()); - let mut offset = (FILE_MAGIC_HEADER.len() + VERSION.len()) as u64; - loop { - match LogBatch::from_bytes(&mut buf, file_num, offset) { - Ok(Some(mut log_batch)) => { - let mut encoded_size = 0; - for item in &log_batch.items { - if let LogItemContent::Entries(ref entries) = item.content { - encoded_size += entries.encoded_size.get(); - } - } - - if let Some(tracker) = pipe_log.cache_submitor().get_cache_tracker( - file_num, - offset, - encoded_size, - ) { - for item in &log_batch.items { - if let LogItemContent::Entries(ref entries) = item.content { - entries.attach_cache_tracker(tracker.clone()); - } - } - } - apply_to_memtable(memtables, &mut log_batch, queue, file_num); - offset = (buf.as_ptr() as usize - start_ptr as usize) as u64; - } - Ok(None) => { - info!("Recovered raft log {:?}.{}.", queue, file_num); - break; + fn apply_to_memtable(&self, log_batch: &mut LogBatch, queue: LogQueue, file_num: u64) { + for item in log_batch.items.drain(..) { + let memtable = self.memtables.get_or_insert(item.raft_group_id); + match item.content { + LogItemContent::Entries(entries_to_add) => { + let entries = entries_to_add.entries; + let entries_index = entries_to_add.entries_index.into_inner(); + if queue == LogQueue::Rewrite { + memtable.wl().append_rewrite(entries, entries_index); + } else { + memtable.wl().append(entries, entries_index); } - Err(e) => { - warn!( - "Raft log content is corrupted at {:?}.{}:{}, error: {}", - queue, file_num, offset, e - ); - // There may be a pre-allocated space at the tail of the active log. - if file_num == active_file_num - && recovery_mode == RecoveryMode::TolerateCorruptedTailRecords - { - pipe_log.truncate_active_log(queue, Some(offset as usize))?; - break; + } + LogItemContent::Command(Command::Clean) => { + if self.memtables.remove(item.raft_group_id).is_some() {} + } + LogItemContent::Command(Command::Compact { index }) => { + memtable.wl().compact_to(index); + } + LogItemContent::Kv(kv) => match kv.op_type { + OpType::Put => { + let (key, value) = (kv.key, kv.value.unwrap()); + match queue { + LogQueue::Append => memtable.wl().put(key, value, file_num), + LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num), } - return Err(box_err!("Raft log content is corrupted")); } - } + OpType::Del => memtable.wl().delete(kv.key.as_slice()), + }, } } - info!("Recover raft log takes {:?}", start.elapsed()); - Ok(()) } - // Write a batch needs 3 steps: - // 1. find all involved raft groups and then lock their memtables; - // 2. append the log batch to pipe log; - // 3. update all involved memtables. - // The lock logic is a little complex. However it's necessary because - // 1. "Inactive log rewrite" needs to keep logs on pipe log order; - // 2. Users can call `append` on one raft group concurrently. - // Maybe we can improve the implement of "inactive log rewrite" and - // forbid concurrent `append` to remove locks here. fn write_impl(&self, log_batch: &mut LogBatch, sync: bool) -> Result { let queue = LogQueue::Append; let mut file_num = 0; let bytes = self.pipe_log.write(log_batch, sync, &mut file_num)?; if file_num > 0 { - apply_to_memtable(&self.memtables, log_batch, queue, file_num); + self.apply_to_memtable(log_batch, queue, file_num); } Ok(bytes) } @@ -251,6 +195,10 @@ where E: Message + Clone, W: EntryExt + 'static, { + pub fn new(cfg: Config) -> Engine { + Self::new_impl(cfg, DEFAULT_CACHE_CHUNK_SIZE).unwrap() + } + fn new_impl(cfg: Config, chunk_limit: usize) -> Result> { let cache_limit = cfg.cache_limit.0 as usize; let global_stats = Arc::new(GlobalStats::default()); @@ -267,7 +215,6 @@ where ), ) .expect("Open raft log"); - pipe_log.cache_submitor().block_on_full(); let memtables = { let stats = global_stats.clone(); @@ -285,20 +232,10 @@ where ); cache_evict_worker.start(cache_evict_runner, Some(Duration::from_secs(1))); - let recovery_mode = cfg.recovery_mode; - Engine::recover( - LogQueue::Rewrite, - &pipe_log, - &memtables, - RecoveryMode::TolerateCorruptedTailRecords, - )?; - Engine::recover(LogQueue::Append, &pipe_log, &memtables, recovery_mode)?; - pipe_log.cache_submitor().nonblock_on_full(); - let cfg = Arc::new(cfg); let purge_manager = PurgeManager::new(cfg.clone(), memtables.clone(), pipe_log.clone()); - Ok(Engine { + let engine = Engine { cfg, memtables, pipe_log, @@ -307,11 +244,95 @@ where workers: Arc::new(RwLock::new(Workers { cache_evict: cache_evict_worker, })), - }) + }; + + engine.pipe_log.cache_submitor().block_on_full(); + engine.recover( + LogQueue::Rewrite, + RecoveryMode::TolerateCorruptedTailRecords, + )?; + engine.recover(LogQueue::Append, engine.cfg.recovery_mode)?; + engine.pipe_log.cache_submitor().nonblock_on_full(); + + Ok(engine) } - pub fn new(cfg: Config) -> Engine { - Self::new_impl(cfg, DEFAULT_CACHE_CHUNK_SIZE).unwrap() + // Recover from disk. + fn recover(&self, queue: LogQueue, recovery_mode: RecoveryMode) -> Result<()> { + // Get first file number and last file number. + let first_file_num = self.pipe_log.first_file_num(queue); + let active_file_num = self.pipe_log.active_file_num(queue); + + // Iterate and recover from files one by one. + let start = Instant::now(); + for file_num in first_file_num..=active_file_num { + // Read a file. + let content = self.pipe_log.read_whole_file(queue, file_num)?; + + // Verify file header. + let mut buf = content.as_slice(); + if !buf.starts_with(FILE_MAGIC_HEADER) { + if file_num != active_file_num { + warn!("Raft log header is corrupted at {:?}.{}", queue, file_num); + return Err(box_err!("Raft log file header is corrupted")); + } else { + self.pipe_log.truncate_active_log(queue, Some(0)).unwrap(); + break; + } + } + + // Iterate all LogBatch in one file. + let start_ptr = buf.as_ptr(); + buf.consume(FILE_MAGIC_HEADER.len() + VERSION.len()); + let mut offset = (FILE_MAGIC_HEADER.len() + VERSION.len()) as u64; + loop { + match LogBatch::from_bytes(&mut buf, file_num, offset) { + Ok(Some(mut log_batch)) => { + let mut encoded_size = 0; + for item in &log_batch.items { + if let LogItemContent::Entries(ref entries) = item.content { + encoded_size += entries.encoded_size.get(); + } + } + + if let Some(tracker) = self.pipe_log.cache_submitor().get_cache_tracker( + file_num, + offset, + encoded_size, + ) { + for item in &log_batch.items { + if let LogItemContent::Entries(ref entries) = item.content { + entries.attach_cache_tracker(tracker.clone()); + } + } + } + self.apply_to_memtable(&mut log_batch, queue, file_num); + offset = (buf.as_ptr() as usize - start_ptr as usize) as u64; + } + Ok(None) => { + info!("Recovered raft log {:?}.{}.", queue, file_num); + break; + } + Err(e) => { + warn!( + "Raft log content is corrupted at {:?}.{}:{}, error: {}", + queue, file_num, offset, e + ); + // There may be a pre-allocated space at the tail of the active log. + if file_num == active_file_num + && recovery_mode == RecoveryMode::TolerateCorruptedTailRecords + { + self.pipe_log + .truncate_active_log(queue, Some(offset as usize))?; + break; + } + return Err(box_err!("Raft log content is corrupted")); + } + } + } + } + info!("Recover raft log takes {:?}", start.elapsed()); + Ok(()) } } @@ -518,59 +539,12 @@ where Ok(e) } -fn apply_to_memtable( - memtables: &MemTableAccessor, - log_batch: &mut LogBatch, - queue: LogQueue, - file_num: u64, -) where - E: Message + Clone, - W: EntryExt, -{ - for item in log_batch.items.drain(..) { - let memtable = memtables.get_or_insert(item.raft_group_id); - match item.content { - LogItemContent::Entries(entries_to_add) => { - let entries = entries_to_add.entries; - let entries_index = entries_to_add.entries_index.into_inner(); - if queue == LogQueue::Rewrite { - memtable.wl().append_rewrite(entries, entries_index); - } else { - memtable.wl().append(entries, entries_index); - } - } - LogItemContent::Command(Command::Clean) => { - memtables.remove(item.raft_group_id); - } - LogItemContent::Command(Command::Compact { index }) => { - memtable.wl().compact_to(index); - } - LogItemContent::Kv(kv) => match kv.op_type { - OpType::Put => { - let (key, value) = (kv.key, kv.value.unwrap()); - match queue { - LogQueue::Append => memtable.wl().put(key, value, file_num), - LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num), - } - } - OpType::Del => memtable.wl().delete(kv.key.as_slice()), - }, - } - } -} - #[cfg(test)] mod tests { use super::*; use crate::util::ReadableSize; use raft::eraftpb::Entry; - impl EntryExt for Entry { - fn index(e: &Entry) -> u64 { - e.get_index() - } - } - type RaftLogEngine = Engine; impl RaftLogEngine { fn append(&self, raft_group_id: u64, entries: Vec) -> Result { diff --git a/src/lib.rs b/src/lib.rs index fd009a25..9f4f0011 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -86,3 +86,15 @@ impl GlobalStats { self.cache_size.store(0, Ordering::Relaxed); } } + +#[cfg(test)] +mod tests { + use crate::log_batch::EntryExt; + use raft::eraftpb::Entry; + + impl EntryExt for Entry { + fn index(e: &Entry) -> u64 { + e.get_index() + } + } +} diff --git a/src/memtable.rs b/src/memtable.rs index 643a942c..6026000c 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -9,7 +9,7 @@ use crate::cache_evict::CacheTracker; use crate::log_batch::{CompressionType, EntryExt}; use crate::pipe_log::LogQueue; use crate::util::{slices_in_range, HashMap}; -use crate::{Error, Result, GlobalStats}; +use crate::{Error, GlobalStats, Result}; const SHRINK_CACHE_CAPACITY: usize = 64; const SHRINK_CACHE_LIMIT: usize = 512; @@ -73,7 +73,7 @@ pub struct MemTable> { rewrite_count: usize, // Region scope key/value pairs - // key -> (value, file_num) + // key -> (value, queue, file_num) kvs: HashMap, (Vec, LogQueue, u64)>, cache_size: usize, @@ -401,8 +401,9 @@ impl> MemTable { vec_idx.extend_from_slice(first); vec_idx.extend_from_slice(second); } - self.global_stats.add_cache_hit(vec.len() - vec_len); - self.global_stats.add_cache_miss(vec_idx.len() - vec_idx_len); + let (hit, miss) = (vec.len() - vec_len, vec_idx.len() - vec_idx_len); + self.global_stats.add_cache_hit(hit); + self.global_stats.add_cache_miss(miss); Ok(()) } @@ -441,7 +442,7 @@ impl> MemTable { let entry = match queue { LogQueue::Append => self.entries_index.get(self.rewrite_count), LogQueue::Rewrite if self.rewrite_count == 0 => None, - _ => self.entries_index.front(), + LogQueue::Rewrite => self.entries_index.front(), }; let ents_min = entry.map(|e| e.file_num); let kvs_min = self.kvs_min_file_num(queue); @@ -469,7 +470,7 @@ impl> MemTable { self.entries_index.back().map(|e| e.index) } - fn kvs_min_file_num(&self, queue: LogQueue) -> Option { + pub fn kvs_min_file_num(&self, queue: LogQueue) -> Option { self.kvs .values() .filter(|v| v.1 == queue) @@ -507,11 +508,12 @@ mod tests { use raft::eraftpb::Entry; impl> MemTable { - fn max_file_num(&self, queue: LogQueue) -> Option { + pub fn max_file_num(&self, queue: LogQueue) -> Option { let entry = match queue { + LogQueue::Append if self.rewrite_count == self.entries_index.len() => None, LogQueue::Append => self.entries_index.back(), LogQueue::Rewrite if self.rewrite_count == 0 => None, - _ => self.entries_index.get(self.rewrite_count - 1), + LogQueue::Rewrite => self.entries_index.get(self.rewrite_count - 1), }; let ents_max = entry.map(|e| e.file_num); @@ -523,7 +525,8 @@ mod tests { (None, None) => None, } } - fn kvs_max_file_num(&self, queue: LogQueue) -> Option { + + pub fn kvs_max_file_num(&self, queue: LogQueue) -> Option { self.kvs .values() .filter(|v| v.1 == queue) diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 9d156302..311157c9 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -669,8 +669,8 @@ mod tests { use super::*; use crate::cache_evict::{CacheSubmitor, CacheTask}; - use crate::GlobalStats; use crate::util::{ReadableSize, Worker}; + use crate::GlobalStats; fn new_test_pipe_log( path: &str, diff --git a/src/purge.rs b/src/purge.rs index d6723c1f..7e42f12b 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -144,33 +144,30 @@ where let mut file_num = 0; self.pipe_log.rewrite(&log_batch, true, &mut file_num)?; if file_num > 0 { - rewrite_to_memtable(&self.memtables, log_batch, file_num, latest_rewrite); + self.rewrite_to_memtable(log_batch, file_num, latest_rewrite); } Ok(()) } -} -fn rewrite_to_memtable( - memtables: &MemTableAccessor, - log_batch: &mut LogBatch, - file_num: u64, - latest_rewrite: u64, -) where - E: Message + Clone, - W: EntryExt, -{ - for item in log_batch.items.drain(..) { - let memtable = memtables.get_or_insert(item.raft_group_id); - match item.content { - LogItemContent::Entries(entries_to_add) => { - let entries_index = entries_to_add.entries_index.into_inner(); - memtable.wl().rewrite(entries_index, latest_rewrite); - } - LogItemContent::Kv(kv) => match kv.op_type { - OpType::Put => memtable.wl().rewrite_key(kv.key, latest_rewrite, file_num), + fn rewrite_to_memtable( + &self, + log_batch: &mut LogBatch, + file_num: u64, + latest_rewrite: u64, + ) { + for item in log_batch.items.drain(..) { + let memtable = self.memtables.get_or_insert(item.raft_group_id); + match item.content { + LogItemContent::Entries(entries_to_add) => { + let entries_index = entries_to_add.entries_index.into_inner(); + memtable.wl().rewrite(entries_index, latest_rewrite); + } + LogItemContent::Kv(kv) => match kv.op_type { + OpType::Put => memtable.wl().rewrite_key(kv.key, latest_rewrite, file_num), + _ => unreachable!(), + }, _ => unreachable!(), - }, - _ => unreachable!(), + } } } }