From 7f89d275a0d3f8a3a26fd9353d8911ccb810e0b4 Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 25 Sep 2020 17:50:14 +0800 Subject: [PATCH] rename cache stats to global stats (#51) Signed-off-by: qupeng --- src/cache_evict.rs | 25 +++++++++-------- src/engine.rs | 68 ++++++++------------------------------------- src/lib.rs | 49 ++++++++++++++++++++++++++++++++ src/memtable.rs | 69 +++++++++++++++++++++++----------------------- src/pipe_log.rs | 4 +-- 5 files changed, 110 insertions(+), 105 deletions(-) diff --git a/src/cache_evict.rs b/src/cache_evict.rs index e5c3afa8..566eb93c 100644 --- a/src/cache_evict.rs +++ b/src/cache_evict.rs @@ -7,7 +7,8 @@ use std::sync::Arc; use crossbeam::channel::{bounded, Sender}; use protobuf::Message; -use crate::engine::{MemTableAccessor, SharedCacheStats}; +use crate::engine::{MemTableAccessor}; +use crate::GlobalStats; use crate::log_batch::{EntryExt, LogBatch, LogItemContent}; use crate::pipe_log::{GenericPipeLog, LogQueue}; use crate::util::{HandyRwLock, Runnable, Scheduler}; @@ -30,7 +31,7 @@ pub struct CacheSubmitor { scheduler: Scheduler, cache_limit: usize, chunk_limit: usize, - cache_stats: Arc, + global_stats: Arc, block_on_full: bool, } @@ -39,7 +40,7 @@ impl CacheSubmitor { cache_limit: usize, chunk_limit: usize, scheduler: Scheduler, - cache_stats: Arc, + global_stats: Arc, ) -> Self { CacheSubmitor { file_num: 0, @@ -49,7 +50,7 @@ impl CacheSubmitor { scheduler, cache_limit, chunk_limit, - cache_stats, + global_stats, block_on_full: false, } } @@ -96,7 +97,7 @@ impl CacheSubmitor { } if self.block_on_full { - let cache_size = self.cache_stats.cache_size(); + let cache_size = self.global_stats.cache_size(); if cache_size > self.cache_limit { let (tx, rx) = bounded(1); if self.scheduler.schedule(CacheTask::EvictOldest(tx)).is_ok() { @@ -107,7 +108,7 @@ impl CacheSubmitor { self.chunk_size += size; self.size_tracker.fetch_add(size, Ordering::Release); - self.cache_stats.add_mem_change(size); + self.global_stats.add_mem_change(size); Some(self.size_tracker.clone()) } @@ -126,7 +127,7 @@ where P: GenericPipeLog, { cache_limit: usize, - cache_stats: Arc, + global_stats: Arc, chunk_limit: usize, valid_cache_chunks: VecDeque, memtables: MemTableAccessor, @@ -141,14 +142,14 @@ where { pub fn new( cache_limit: usize, - cache_stats: Arc, + global_stats: Arc, chunk_limit: usize, memtables: MemTableAccessor, pipe_log: P, ) -> Runner { Runner { cache_limit, - cache_stats, + global_stats, chunk_limit, valid_cache_chunks: Default::default(), memtables, @@ -157,7 +158,7 @@ where } fn retain_valid_cache(&mut self) { - let cache_size = self.cache_stats.cache_size(); + let cache_size = self.global_stats.cache_size(); if self.valid_cache_chunks.len() * self.chunk_limit >= cache_size * 3 { // There could be many empty chunks. self.valid_cache_chunks @@ -169,12 +170,12 @@ where } fn cache_reach_high_water(&self) -> bool { - let cache_size = self.cache_stats.cache_size(); + let cache_size = self.global_stats.cache_size(); cache_size > (self.cache_limit as f64 * HIGH_WATER_RATIO) as usize } fn cache_reach_low_water(&self) -> bool { - let cache_size = self.cache_stats.cache_size(); + let cache_size = self.global_stats.cache_size(); cache_size <= (self.cache_limit as f64 * LOW_WATER_RATIO) as usize } diff --git a/src/engine.rs b/src/engine.rs index a9451a14..26dca366 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,5 +1,4 @@ use std::io::BufRead; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use std::{fmt, u64}; @@ -19,7 +18,7 @@ use crate::memtable::{EntryIndex, MemTable}; use crate::pipe_log::{GenericPipeLog, LogQueue, PipeLog, FILE_MAGIC_HEADER, VERSION}; use crate::purge::PurgeManager; use crate::util::{HandyRwLock, HashMap, Worker}; -use crate::{codec, CacheStats, Result}; +use crate::{codec, GlobalStats, CacheStats, Result}; const SLOTS_COUNT: usize = 128; @@ -118,7 +117,7 @@ where cfg: Arc, memtables: MemTableAccessor, pipe_log: P, - cache_stats: Arc, + global_stats: Arc, purge_manager: PurgeManager, workers: Arc>, @@ -243,45 +242,6 @@ where } } -#[derive(Default)] -pub struct SharedCacheStats { - hit: AtomicUsize, - miss: AtomicUsize, - cache_size: AtomicUsize, -} - -impl SharedCacheStats { - pub fn sub_mem_change(&self, bytes: usize) { - self.cache_size.fetch_sub(bytes, Ordering::Release); - } - pub fn add_mem_change(&self, bytes: usize) { - self.cache_size.fetch_add(bytes, Ordering::Release); - } - pub fn hit_cache(&self, count: usize) { - self.hit.fetch_add(count, Ordering::Relaxed); - } - pub fn miss_cache(&self, count: usize) { - self.miss.fetch_add(count, Ordering::Relaxed); - } - - pub fn hit_times(&self) -> usize { - self.hit.load(Ordering::Relaxed) - } - pub fn miss_times(&self) -> usize { - self.miss.load(Ordering::Relaxed) - } - pub fn cache_size(&self) -> usize { - self.cache_size.load(Ordering::Acquire) - } - - #[cfg(test)] - pub fn reset(&self) { - self.hit.store(0, Ordering::Relaxed); - self.miss.store(0, Ordering::Relaxed); - self.cache_size.store(0, Ordering::Relaxed); - } -} - struct Workers { cache_evict: Worker, } @@ -293,7 +253,7 @@ where { fn new_impl(cfg: Config, chunk_limit: usize) -> Result> { let cache_limit = cfg.cache_limit.0 as usize; - let cache_stats = Arc::new(SharedCacheStats::default()); + let global_stats = Arc::new(GlobalStats::default()); let mut cache_evict_worker = Worker::new("cache_evict".to_owned(), None); @@ -303,14 +263,14 @@ where cache_limit, chunk_limit, cache_evict_worker.scheduler(), - cache_stats.clone(), + global_stats.clone(), ), ) .expect("Open raft log"); pipe_log.cache_submitor().block_on_full(); let memtables = { - let stats = cache_stats.clone(); + let stats = global_stats.clone(); MemTableAccessor::::new(Arc::new(move |id: u64| { MemTable::new(id, cache_limit, stats.clone()) })) @@ -318,7 +278,7 @@ where let cache_evict_runner = CacheEvictRunner::new( cache_limit, - cache_stats.clone(), + global_stats.clone(), chunk_limit, memtables.clone(), pipe_log.clone(), @@ -342,7 +302,7 @@ where cfg, memtables, pipe_log, - cache_stats, + global_stats, purge_manager, workers: Arc::new(RwLock::new(Workers { cache_evict: cache_evict_worker, @@ -484,12 +444,8 @@ where } /// Flush stats about EntryCache. - pub fn flush_stats(&self) -> CacheStats { - CacheStats { - hit: self.cache_stats.hit.swap(0, Ordering::SeqCst), - miss: self.cache_stats.miss.swap(0, Ordering::SeqCst), - cache_size: self.cache_stats.cache_size.load(Ordering::SeqCst), - } + pub fn flush_cache_stats(&self) -> CacheStats { + self.global_stats.flush_cache_stats() } /// Stop background thread which will keep trying evict caching. @@ -791,14 +747,14 @@ mod tests { } } - let cache_size = engine.cache_stats.cache_size(); + let cache_size = engine.global_stats.cache_size(); assert!(cache_size <= 10 * 1024 * 1024); // Recover from log files. engine.stop(); drop(engine); let engine = RaftLogEngine::new_impl(cfg.clone(), 512 * 1024).unwrap(); - let cache_size = engine.cache_stats.cache_size(); + let cache_size = engine.global_stats.cache_size(); assert!(cache_size <= 10 * 1024 * 1024); // Rewrite inactive logs. @@ -807,7 +763,7 @@ mod tests { } let ret = engine.purge_expired_files().unwrap(); assert!(ret.is_empty()); - let cache_size = engine.cache_stats.cache_size(); + let cache_size = engine.global_stats.cache_size(); assert!(cache_size <= 10 * 1024 * 1024); } diff --git a/src/lib.rs b/src/lib.rs index 443abdf4..fd009a25 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,8 @@ #![feature(shrink_to)] #![feature(cell_update)] +use std::sync::atomic::{AtomicUsize, Ordering}; + macro_rules! box_err { ($e:expr) => ({ use std::error::Error; @@ -37,3 +39,50 @@ pub struct CacheStats { pub miss: usize, pub cache_size: usize, } + +#[derive(Default)] +pub struct GlobalStats { + cache_hit: AtomicUsize, + cache_miss: AtomicUsize, + cache_size: AtomicUsize, +} + +impl GlobalStats { + pub fn sub_mem_change(&self, bytes: usize) { + self.cache_size.fetch_sub(bytes, Ordering::Release); + } + pub fn add_mem_change(&self, bytes: usize) { + self.cache_size.fetch_add(bytes, Ordering::Release); + } + pub fn add_cache_hit(&self, count: usize) { + self.cache_hit.fetch_add(count, Ordering::Relaxed); + } + pub fn add_cache_miss(&self, count: usize) { + self.cache_miss.fetch_add(count, Ordering::Relaxed); + } + + pub fn cache_hit(&self) -> usize { + self.cache_hit.load(Ordering::Relaxed) + } + pub fn cache_miss(&self) -> usize { + self.cache_miss.load(Ordering::Relaxed) + } + pub fn cache_size(&self) -> usize { + self.cache_size.load(Ordering::Acquire) + } + + pub fn flush_cache_stats(&self) -> CacheStats { + CacheStats { + hit: self.cache_hit.swap(0, Ordering::SeqCst), + miss: self.cache_miss.swap(0, Ordering::SeqCst), + cache_size: self.cache_size.load(Ordering::SeqCst), + } + } + + #[cfg(test)] + pub fn reset_cache(&self) { + self.cache_hit.store(0, Ordering::Relaxed); + self.cache_miss.store(0, Ordering::Relaxed); + self.cache_size.store(0, Ordering::Relaxed); + } +} diff --git a/src/memtable.rs b/src/memtable.rs index 819f35f2..643a942c 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -6,11 +6,10 @@ use std::{cmp, u64}; use protobuf::Message; use crate::cache_evict::CacheTracker; -use crate::engine::SharedCacheStats; use crate::log_batch::{CompressionType, EntryExt}; use crate::pipe_log::LogQueue; use crate::util::{slices_in_range, HashMap}; -use crate::{Error, Result}; +use crate::{Error, Result, GlobalStats}; const SHRINK_CACHE_CAPACITY: usize = 64; const SHRINK_CACHE_LIMIT: usize = 512; @@ -79,7 +78,7 @@ pub struct MemTable> { cache_size: usize, cache_limit: usize, - cache_stats: Arc, + global_stats: Arc, _phantom: PhantomData, } @@ -118,7 +117,7 @@ impl> MemTable { let entry_index = &mut self.entries_index[distance + offset]; entry_index.cache_tracker.take(); self.cache_size -= entry_index.len as usize; - self.cache_stats.sub_mem_change(entry_index.len as usize); + self.global_stats.sub_mem_change(entry_index.len as usize); } self.entries_cache.truncate(conflict); @@ -160,7 +159,7 @@ impl> MemTable { pub fn new( region_id: u64, cache_limit: usize, - cache_stats: Arc, + global_stats: Arc, ) -> MemTable { MemTable:: { region_id, @@ -171,7 +170,7 @@ impl> MemTable { cache_size: 0, cache_limit, - cache_stats, + global_stats, _phantom: PhantomData, } } @@ -210,7 +209,7 @@ impl> MemTable { entry_index.cache_tracker.take(); self.cache_size -= entry_index.len as usize; - self.cache_stats.sub_mem_change(entry_index.len as usize); + self.global_stats.sub_mem_change(entry_index.len as usize); } } @@ -310,7 +309,7 @@ impl> MemTable { let entry_index = &mut self.entries_index[distance + i]; entry_index.cache_tracker.take(); self.cache_size -= entry_index.len as usize; - self.cache_stats.sub_mem_change(entry_index.len as usize); + self.global_stats.sub_mem_change(entry_index.len as usize); } self.shrink_entries_cache(); } @@ -332,11 +331,11 @@ impl> MemTable { let ioffset = (index - first_index) as usize; let cache_distance = self.cache_distance(); if ioffset < cache_distance { - self.cache_stats.miss_cache(1); + self.global_stats.add_cache_miss(1); let entry_index = self.entries_index[ioffset].clone(); (None, Some(entry_index)) } else { - self.cache_stats.hit_cache(1); + self.global_stats.add_cache_hit(1); let coffset = ioffset - cache_distance; let entry = self.entries_cache[coffset].clone(); (Some(entry), None) @@ -402,8 +401,8 @@ impl> MemTable { vec_idx.extend_from_slice(first); vec_idx.extend_from_slice(second); } - self.cache_stats.hit_cache(vec.len() - vec_len); - self.cache_stats.miss_cache(vec_idx.len() - vec_idx_len); + self.global_stats.add_cache_hit(vec.len() - vec_len); + self.global_stats.add_cache_miss(vec_idx.len() - vec_idx_len); Ok(()) } @@ -498,7 +497,7 @@ impl> Drop for MemTable { fn drop(&mut self) { // Drop `cache_tracker`s and sub mem change. self.entries_index.clear(); - self.cache_stats.sub_mem_change(self.cache_size as usize); + self.global_stats.sub_mem_change(self.cache_size as usize); } } @@ -563,7 +562,7 @@ mod tests { fn test_memtable_append() { let region_id = 8; let cache_limit = 15; - let stats = Arc::new(SharedCacheStats::default()); + let stats = Arc::new(GlobalStats::default()); let mut memtable = MemTable::::new(region_id, cache_limit, stats); // Append entries [10, 20) file_num = 1 not over cache size limitation. @@ -652,7 +651,7 @@ mod tests { memtable.check_entries_index_and_cache(); // Cache with size limit 0. - let stats = Arc::new(SharedCacheStats::default()); + let stats = Arc::new(GlobalStats::default()); let mut memtable = MemTable::::new(region_id, 0, stats); memtable.append(generate_ents(10, 20), generate_ents_index(10, 20, 1)); assert_eq!(memtable.cache_size, 0); @@ -668,7 +667,7 @@ mod tests { fn test_memtable_compact() { let region_id = 8; let cache_limit = 10; - let stats = Arc::new(SharedCacheStats::default()); + let stats = Arc::new(GlobalStats::default()); let mut memtable = MemTable::::new(region_id, cache_limit, stats); // After appending: @@ -731,7 +730,7 @@ mod tests { fn test_memtable_compact_cache() { let region_id = 8; let cache_limit = 10; - let stats = Arc::new(SharedCacheStats::default()); + let stats = Arc::new(GlobalStats::default()); let mut memtable = MemTable::::new(region_id, cache_limit, stats); // After appending: @@ -771,7 +770,7 @@ mod tests { fn test_memtable_fetch() { let region_id = 8; let cache_limit = 10; - let stats = Arc::new(SharedCacheStats::default()); + let stats = Arc::new(GlobalStats::default()); let mut memtable = MemTable::::new(region_id, cache_limit, stats.clone()); // After appending: @@ -794,8 +793,8 @@ mod tests { assert_eq!(ents_idx.len(), 15); assert_eq!(ents_idx[0].index, 0); assert_eq!(ents_idx[14].index, 14); - assert_eq!(stats.hit_times(), 10); - assert_eq!(stats.miss_times(), 15); + assert_eq!(stats.cache_hit(), 10); + assert_eq!(stats.cache_miss(), 15); // After compact: // [10, 15) file_num = 2, not in cache @@ -820,7 +819,7 @@ mod tests { // All needed entries are in cache. ents.clear(); ents_idx.clear(); - stats.reset(); + stats.reset_cache(); memtable .fetch_entries_to(20, 25, None, &mut ents, &mut ents_idx) .unwrap(); @@ -828,12 +827,12 @@ mod tests { assert_eq!(ents[0].get_index(), 20); assert_eq!(ents[4].get_index(), 24); assert!(ents_idx.is_empty()); - assert_eq!(stats.hit_times(), 5); + assert_eq!(stats.cache_hit(), 5); // All needed entries are not in cache. ents.clear(); ents_idx.clear(); - stats.reset(); + stats.reset_cache(); memtable .fetch_entries_to(10, 15, None, &mut ents, &mut ents_idx) .unwrap(); @@ -841,12 +840,12 @@ mod tests { assert_eq!(ents_idx.len(), 5); assert_eq!(ents_idx[0].index, 10); assert_eq!(ents_idx[4].index, 14); - assert_eq!(stats.miss_times(), 5); + assert_eq!(stats.cache_miss(), 5); // Some needed entries are in cache, the others are not. ents.clear(); ents_idx.clear(); - stats.reset(); + stats.reset_cache(); memtable .fetch_entries_to(10, 25, None, &mut ents, &mut ents_idx) .unwrap(); @@ -856,8 +855,8 @@ mod tests { assert_eq!(ents_idx.len(), 5); assert_eq!(ents_idx[0].index, 10); assert_eq!(ents_idx[4].index, 14); - assert_eq!(stats.hit_times(), 10); - assert_eq!(stats.miss_times(), 5); + assert_eq!(stats.cache_hit(), 10); + assert_eq!(stats.cache_miss(), 5); // Max size limitation range fetching. // Only can fetch [10, 20) because of size limitation, @@ -865,7 +864,7 @@ mod tests { ents.clear(); ents_idx.clear(); let max_size = Some(10); - stats.reset(); + stats.reset_cache(); memtable .fetch_entries_to(10, 25, max_size, &mut ents, &mut ents_idx) .unwrap(); @@ -875,27 +874,27 @@ mod tests { assert_eq!(ents_idx.len(), 5); assert_eq!(ents_idx[0].index, 10); assert_eq!(ents_idx[4].index, 14); - assert_eq!(stats.hit_times(), 5); - assert_eq!(stats.miss_times(), 5); + assert_eq!(stats.cache_hit(), 5); + assert_eq!(stats.cache_miss(), 5); // Even max size limitation is 0, at least fetch one entry. ents.clear(); ents_idx.clear(); - stats.reset(); + stats.reset_cache(); memtable .fetch_entries_to(20, 25, Some(0), &mut ents, &mut ents_idx) .unwrap(); assert_eq!(ents.len(), 1); assert_eq!(ents[0].get_index(), 20); assert!(ents_idx.is_empty()); - assert_eq!(stats.hit_times(), 1); + assert_eq!(stats.cache_hit(), 1); } #[test] fn test_memtable_kv_operations() { let region_id = 8; let cache_limit = 1024; - let stats = Arc::new(SharedCacheStats::default()); + let stats = Arc::new(GlobalStats::default()); let mut memtable = MemTable::::new(region_id, cache_limit, stats); let (k1, v1) = (b"key1", b"value1"); @@ -915,7 +914,7 @@ mod tests { fn test_memtable_get_entry() { let region_id = 8; let cache_limit = 10; - let stats = Arc::new(SharedCacheStats::default()); + let stats = Arc::new(GlobalStats::default()); let mut memtable = MemTable::::new(region_id, cache_limit, stats); // [5, 10) file_num = 1, not in cache @@ -940,7 +939,7 @@ mod tests { fn test_memtable_rewrite() { let region_id = 8; let cache_limit = 15; - let stats = Arc::new(SharedCacheStats::default()); + let stats = Arc::new(GlobalStats::default()); let mut memtable = MemTable::::new(region_id, cache_limit, stats); // after appending diff --git a/src/pipe_log.rs b/src/pipe_log.rs index dc0cbad9..9d156302 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -669,7 +669,7 @@ mod tests { use super::*; use crate::cache_evict::{CacheSubmitor, CacheTask}; - use crate::engine::SharedCacheStats; + use crate::GlobalStats; use crate::util::{ReadableSize, Worker}; fn new_test_pipe_log( @@ -683,7 +683,7 @@ mod tests { cfg.target_file_size = ReadableSize(rotate_size as u64); let mut worker = Worker::new("test".to_owned(), None); - let stats = Arc::new(SharedCacheStats::default()); + let stats = Arc::new(GlobalStats::default()); let submitor = CacheSubmitor::new(usize::MAX, 4096, worker.scheduler(), stats); let log = PipeLog::open(&cfg, submitor).unwrap(); (log, worker.take_receiver())