Skip to content

Commit

Permalink
evict fluash data at first
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Feb 26, 2024
1 parent 3483408 commit 5e683dd
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 47 deletions.
67 changes: 41 additions & 26 deletions src/common/src/fifo_cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashMap;
use std::error::Error;
use std::future::Future;
use std::hash::Hasher;
use std::ops::Add;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

Expand Down Expand Up @@ -53,16 +54,34 @@ type RequestQueue<T> = Vec<Sender<T>>;

pub const SMALL_CACHE_RATIO_PERCENT: usize = 40;

#[derive(Default, Debug)]
pub struct CacheStats {
evict_small_count: usize,
evict_main_count: usize,
insert_in_ghost: usize,
insert_count: usize,
}

impl Add for CacheStats {
type Output = Self;

fn add(self, rhs: Self) -> Self::Output {
Self {
evict_small_count: self.evict_small_count + rhs.evict_small_count,
evict_main_count: self.evict_main_count + rhs.evict_main_count,
insert_in_ghost: self.insert_in_ghost + rhs.insert_in_ghost,
insert_count: self.insert_count + rhs.insert_count,
}
}
}

pub struct FifoCacheShard<K: CacheKey, V: CacheValue> {
map: HashMap<K, CacheHandle<K, V>>,
small: SmallHotCache<K, V>,
main: MainCache<K, V>,
ghost: GhostCache,
write_request: HashMap<K, RequestQueue<V>>,
evict_small_times: usize,
evict_main_times: usize,
insert_in_ghost: usize,

stats: CacheStats,
capacity: usize,
}

Expand All @@ -77,9 +96,7 @@ impl<K: CacheKey, V: CacheValue> FifoCacheShard<K, V> {
main,
ghost: GhostCache::new(),
write_request: HashMap::default(),
evict_main_times: 0,
evict_small_times: 0,
insert_in_ghost: 0,
stats: CacheStats::default(),
capacity,
}
}
Expand Down Expand Up @@ -115,10 +132,10 @@ impl<K: CacheKey, V: CacheValue> FifoCacheShard<K, V> {
pub fn evict(&mut self, ghost_capacity: usize, deleted: &mut Vec<Box<CacheItem<K, V>>>) {
if self.small.is_full() {
if let Some(item) = self.small.evict() {
if item.get_freq() > 0 {
if item.get_freq() > 1 {
self.main.insert(item);
} else {
self.evict_small_times += 1;
self.stats.evict_small_count += 1;
self.ghost.insert(item.hash(), ghost_capacity);
self.map.remove(&item.key);
deleted.push(item);
Expand All @@ -127,7 +144,7 @@ impl<K: CacheKey, V: CacheValue> FifoCacheShard<K, V> {
}
if self.main.is_full() {
if let Some(item) = self.main.evict() {
self.evict_main_times += 1;
self.stats.evict_main_count += 1;
deleted.push(item);
}
}
Expand All @@ -139,6 +156,7 @@ impl<K: CacheKey, V: CacheValue> FifoCacheShard<K, V> {
value: V,
h: u64,
cost: usize,
read_req: usize,
deleted: &mut Vec<Box<CacheItem<K, V>>>,
) {
if let Some(handle) = self.map.get_mut(&key) {
Expand All @@ -160,27 +178,24 @@ impl<K: CacheKey, V: CacheValue> FifoCacheShard<K, V> {
}
let is_ghost = self.ghost.is_ghost(h);
let mut item = Box::new(CacheItem::new(key.clone(), value, cost));
if read_req > 0 {
item.inc_freq();
}
let addr = item.as_mut();
// let addr = Box::into_raw(item);
let handle = CacheHandle { item: addr };
self.map.insert(key.clone(), handle);
self.stats.insert_count += 1;
if is_ghost {
self.insert_in_ghost += 1;
self.stats.insert_in_ghost += 1;
self.main.insert(item);
} else {
self.small.insert(item);
}
}

fn debug_print(&mut self) -> String {
let ret = format!(
"evict_small_times: {} evict_main_times: {}, insert_in_ghost: {}",
self.evict_small_times, self.evict_main_times, self.insert_in_ghost,
);
self.evict_small_times = 0;
self.evict_main_times = 0;
self.insert_in_ghost = 0;
ret
fn debug_print(&mut self) -> CacheStats {
std::mem::take(&mut self.stats)
}
}

Expand Down Expand Up @@ -258,14 +273,12 @@ impl<K: CacheKey + 'static, V: CacheValue + 'static> FifoCache<K, V> {
}

pub fn debug_print(&self) -> String {
let mut s = "FIFOCache: [".to_string();
let mut stats = CacheStats::default();
for shard in &self.shards {
let mut shard = shard.lock();
s += &(shard.debug_print() + ", ");
stats = stats + shard.debug_print();
}
s.pop();
s.pop();
s + "]"
format!("CacheStats: {:?}", stats)
}

fn hash(key: &K) -> u64 {
Expand All @@ -279,12 +292,14 @@ impl<K: CacheKey + 'static, V: CacheValue + 'static> FifoCache<K, V> {
// Drop the entries outside lock to avoid deadlock.
let hash = Self::hash(&key);
let mut senders = vec![];
let mut ping_count = 0;
{
let mut shard = self.shards[hash as usize % self.shards.len()].lock();
if let Some(pending_request) = shard.write_request.remove(&key) {
ping_count = pending_request.len() + 1;
senders = pending_request;
}
shard.insert(key, value.clone(), hash, charge, &mut to_delete);
shard.insert(key, value.clone(), hash, charge, ping_count, &mut to_delete);
}
for sender in senders {
let _ = sender.send(value.clone());
Expand Down
21 changes: 3 additions & 18 deletions src/common/src/fifo_cache/most.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,14 @@ impl<K: CacheKey, V: CacheValue> MainCache<K, V> {

pub fn evict(&mut self) -> Option<Box<CacheItem<K, V>>> {
let mut idx = 0;
let mut second_item = None;
while let Some(mut item) = self.queue.pop_front() {
if !item.dec_freq() {
if let Some(last_item) = second_item {
self.queue.push_back(last_item);
}
if !item.dec_freq() || idx >= MAX_EVICT_LOOP {
self.cost
.fetch_sub(item.cost(), std::sync::atomic::Ordering::Release);
return Some(item);
} else if item.get_freq() == 0 && second_item.is_none() {
second_item = Some(item);
} else {
if idx >= MAX_EVICT_LOOP {
if second_item.is_some() {
self.queue.push_back(item);
return second_item;
} else {
return Some(item);
}
}
self.queue.push_back(item);
idx += 1;
}
self.queue.push_back(item);
idx += 1;
}
None
}
Expand Down
6 changes: 3 additions & 3 deletions src/storage/benches/bench_lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,15 @@ impl Hash for BlockKey {
}

pub struct FIFOCacheImpl {
inner: FifoCache<BlockKey, Arc<Block>>,
inner: Arc<FifoCache<BlockKey, Arc<Block>>>,
fake_io_latency: Duration,
write_requests: Arc<Mutex<HashMap<BlockKey, RequestQueue>>>,
}

impl FIFOCacheImpl {
pub fn new(capacity: usize, fake_io_latency: Duration) -> Self {
Self {
inner: FifoCache::new(capacity * BLOCK_SIZE),
inner: Arc::new(FifoCache::new(2, capacity * BLOCK_SIZE)),
fake_io_latency,
write_requests: Arc::new(Mutex::new(HashMap::default())),
}
Expand All @@ -158,7 +158,7 @@ impl CacheBase for FIFOCacheImpl {
object_id,
block_idx,
};
if let Some(ret) = self.inner.get(&key) {
if let Some(ret) = self.inner.lookup(&key) {
return Ok(ret);
}

Expand Down

0 comments on commit 5e683dd

Please sign in to comment.