Skip to content

Commit

Permalink
rename cache stats to global stats (#51)
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu authored Sep 25, 2020
1 parent c615a14 commit 7f89d27
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 105 deletions.
25 changes: 13 additions & 12 deletions src/cache_evict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use std::sync::Arc;
use crossbeam::channel::{bounded, Sender};
use protobuf::Message;

use crate::engine::{MemTableAccessor, SharedCacheStats};
use crate::engine::{MemTableAccessor};
use crate::GlobalStats;
use crate::log_batch::{EntryExt, LogBatch, LogItemContent};
use crate::pipe_log::{GenericPipeLog, LogQueue};
use crate::util::{HandyRwLock, Runnable, Scheduler};
Expand All @@ -30,7 +31,7 @@ pub struct CacheSubmitor {
scheduler: Scheduler<CacheTask>,
cache_limit: usize,
chunk_limit: usize,
cache_stats: Arc<SharedCacheStats>,
global_stats: Arc<GlobalStats>,
block_on_full: bool,
}

Expand All @@ -39,7 +40,7 @@ impl CacheSubmitor {
cache_limit: usize,
chunk_limit: usize,
scheduler: Scheduler<CacheTask>,
cache_stats: Arc<SharedCacheStats>,
global_stats: Arc<GlobalStats>,
) -> Self {
CacheSubmitor {
file_num: 0,
Expand All @@ -49,7 +50,7 @@ impl CacheSubmitor {
scheduler,
cache_limit,
chunk_limit,
cache_stats,
global_stats,
block_on_full: false,
}
}
Expand Down Expand Up @@ -96,7 +97,7 @@ impl CacheSubmitor {
}

if self.block_on_full {
let cache_size = self.cache_stats.cache_size();
let cache_size = self.global_stats.cache_size();
if cache_size > self.cache_limit {
let (tx, rx) = bounded(1);
if self.scheduler.schedule(CacheTask::EvictOldest(tx)).is_ok() {
Expand All @@ -107,7 +108,7 @@ impl CacheSubmitor {

self.chunk_size += size;
self.size_tracker.fetch_add(size, Ordering::Release);
self.cache_stats.add_mem_change(size);
self.global_stats.add_mem_change(size);
Some(self.size_tracker.clone())
}

Expand All @@ -126,7 +127,7 @@ where
P: GenericPipeLog,
{
cache_limit: usize,
cache_stats: Arc<SharedCacheStats>,
global_stats: Arc<GlobalStats>,
chunk_limit: usize,
valid_cache_chunks: VecDeque<CacheChunk>,
memtables: MemTableAccessor<E, W>,
Expand All @@ -141,14 +142,14 @@ where
{
pub fn new(
cache_limit: usize,
cache_stats: Arc<SharedCacheStats>,
global_stats: Arc<GlobalStats>,
chunk_limit: usize,
memtables: MemTableAccessor<E, W>,
pipe_log: P,
) -> Runner<E, W, P> {
Runner {
cache_limit,
cache_stats,
global_stats,
chunk_limit,
valid_cache_chunks: Default::default(),
memtables,
Expand All @@ -157,7 +158,7 @@ where
}

fn retain_valid_cache(&mut self) {
let cache_size = self.cache_stats.cache_size();
let cache_size = self.global_stats.cache_size();
if self.valid_cache_chunks.len() * self.chunk_limit >= cache_size * 3 {
// There could be many empty chunks.
self.valid_cache_chunks
Expand All @@ -169,12 +170,12 @@ where
}

fn cache_reach_high_water(&self) -> bool {
let cache_size = self.cache_stats.cache_size();
let cache_size = self.global_stats.cache_size();
cache_size > (self.cache_limit as f64 * HIGH_WATER_RATIO) as usize
}

fn cache_reach_low_water(&self) -> bool {
let cache_size = self.cache_stats.cache_size();
let cache_size = self.global_stats.cache_size();
cache_size <= (self.cache_limit as f64 * LOW_WATER_RATIO) as usize
}

Expand Down
68 changes: 12 additions & 56 deletions src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::io::BufRead;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use std::{fmt, u64};
Expand All @@ -19,7 +18,7 @@ use crate::memtable::{EntryIndex, MemTable};
use crate::pipe_log::{GenericPipeLog, LogQueue, PipeLog, FILE_MAGIC_HEADER, VERSION};
use crate::purge::PurgeManager;
use crate::util::{HandyRwLock, HashMap, Worker};
use crate::{codec, CacheStats, Result};
use crate::{codec, GlobalStats, CacheStats, Result};

const SLOTS_COUNT: usize = 128;

Expand Down Expand Up @@ -118,7 +117,7 @@ where
cfg: Arc<Config>,
memtables: MemTableAccessor<E, W>,
pipe_log: P,
cache_stats: Arc<SharedCacheStats>,
global_stats: Arc<GlobalStats>,
purge_manager: PurgeManager<E, W, P>,

workers: Arc<RwLock<Workers>>,
Expand Down Expand Up @@ -243,45 +242,6 @@ where
}
}

#[derive(Default)]
pub struct SharedCacheStats {
hit: AtomicUsize,
miss: AtomicUsize,
cache_size: AtomicUsize,
}

impl SharedCacheStats {
pub fn sub_mem_change(&self, bytes: usize) {
self.cache_size.fetch_sub(bytes, Ordering::Release);
}
pub fn add_mem_change(&self, bytes: usize) {
self.cache_size.fetch_add(bytes, Ordering::Release);
}
pub fn hit_cache(&self, count: usize) {
self.hit.fetch_add(count, Ordering::Relaxed);
}
pub fn miss_cache(&self, count: usize) {
self.miss.fetch_add(count, Ordering::Relaxed);
}

pub fn hit_times(&self) -> usize {
self.hit.load(Ordering::Relaxed)
}
pub fn miss_times(&self) -> usize {
self.miss.load(Ordering::Relaxed)
}
pub fn cache_size(&self) -> usize {
self.cache_size.load(Ordering::Acquire)
}

#[cfg(test)]
pub fn reset(&self) {
self.hit.store(0, Ordering::Relaxed);
self.miss.store(0, Ordering::Relaxed);
self.cache_size.store(0, Ordering::Relaxed);
}
}

struct Workers {
cache_evict: Worker<CacheTask>,
}
Expand All @@ -293,7 +253,7 @@ where
{
fn new_impl(cfg: Config, chunk_limit: usize) -> Result<Engine<E, W, PipeLog>> {
let cache_limit = cfg.cache_limit.0 as usize;
let cache_stats = Arc::new(SharedCacheStats::default());
let global_stats = Arc::new(GlobalStats::default());

let mut cache_evict_worker = Worker::new("cache_evict".to_owned(), None);

Expand All @@ -303,22 +263,22 @@ where
cache_limit,
chunk_limit,
cache_evict_worker.scheduler(),
cache_stats.clone(),
global_stats.clone(),
),
)
.expect("Open raft log");
pipe_log.cache_submitor().block_on_full();

let memtables = {
let stats = cache_stats.clone();
let stats = global_stats.clone();
MemTableAccessor::<E, W>::new(Arc::new(move |id: u64| {
MemTable::new(id, cache_limit, stats.clone())
}))
};

let cache_evict_runner = CacheEvictRunner::new(
cache_limit,
cache_stats.clone(),
global_stats.clone(),
chunk_limit,
memtables.clone(),
pipe_log.clone(),
Expand All @@ -342,7 +302,7 @@ where
cfg,
memtables,
pipe_log,
cache_stats,
global_stats,
purge_manager,
workers: Arc::new(RwLock::new(Workers {
cache_evict: cache_evict_worker,
Expand Down Expand Up @@ -484,12 +444,8 @@ where
}

/// Flush stats about EntryCache.
pub fn flush_stats(&self) -> CacheStats {
CacheStats {
hit: self.cache_stats.hit.swap(0, Ordering::SeqCst),
miss: self.cache_stats.miss.swap(0, Ordering::SeqCst),
cache_size: self.cache_stats.cache_size.load(Ordering::SeqCst),
}
pub fn flush_cache_stats(&self) -> CacheStats {
self.global_stats.flush_cache_stats()
}

/// Stop background thread which will keep trying evict caching.
Expand Down Expand Up @@ -791,14 +747,14 @@ mod tests {
}
}

let cache_size = engine.cache_stats.cache_size();
let cache_size = engine.global_stats.cache_size();
assert!(cache_size <= 10 * 1024 * 1024);

// Recover from log files.
engine.stop();
drop(engine);
let engine = RaftLogEngine::new_impl(cfg.clone(), 512 * 1024).unwrap();
let cache_size = engine.cache_stats.cache_size();
let cache_size = engine.global_stats.cache_size();
assert!(cache_size <= 10 * 1024 * 1024);

// Rewrite inactive logs.
Expand All @@ -807,7 +763,7 @@ mod tests {
}
let ret = engine.purge_expired_files().unwrap();
assert!(ret.is_empty());
let cache_size = engine.cache_stats.cache_size();
let cache_size = engine.global_stats.cache_size();
assert!(cache_size <= 10 * 1024 * 1024);
}

Expand Down
49 changes: 49 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#![feature(shrink_to)]
#![feature(cell_update)]

use std::sync::atomic::{AtomicUsize, Ordering};

macro_rules! box_err {
($e:expr) => ({
use std::error::Error;
Expand Down Expand Up @@ -37,3 +39,50 @@ pub struct CacheStats {
pub miss: usize,
pub cache_size: usize,
}

#[derive(Default)]
pub struct GlobalStats {
cache_hit: AtomicUsize,
cache_miss: AtomicUsize,
cache_size: AtomicUsize,
}

impl GlobalStats {
pub fn sub_mem_change(&self, bytes: usize) {
self.cache_size.fetch_sub(bytes, Ordering::Release);
}
pub fn add_mem_change(&self, bytes: usize) {
self.cache_size.fetch_add(bytes, Ordering::Release);
}
pub fn add_cache_hit(&self, count: usize) {
self.cache_hit.fetch_add(count, Ordering::Relaxed);
}
pub fn add_cache_miss(&self, count: usize) {
self.cache_miss.fetch_add(count, Ordering::Relaxed);
}

pub fn cache_hit(&self) -> usize {
self.cache_hit.load(Ordering::Relaxed)
}
pub fn cache_miss(&self) -> usize {
self.cache_miss.load(Ordering::Relaxed)
}
pub fn cache_size(&self) -> usize {
self.cache_size.load(Ordering::Acquire)
}

pub fn flush_cache_stats(&self) -> CacheStats {
CacheStats {
hit: self.cache_hit.swap(0, Ordering::SeqCst),
miss: self.cache_miss.swap(0, Ordering::SeqCst),
cache_size: self.cache_size.load(Ordering::SeqCst),
}
}

#[cfg(test)]
pub fn reset_cache(&self) {
self.cache_hit.store(0, Ordering::Relaxed);
self.cache_miss.store(0, Ordering::Relaxed);
self.cache_size.store(0, Ordering::Relaxed);
}
}
Loading

0 comments on commit 7f89d27

Please sign in to comment.