Skip to content

Commit

Permalink
refactor to shard cache
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Feb 21, 2024
1 parent 7449fcc commit b2bbaa2
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 215 deletions.
174 changes: 143 additions & 31 deletions src/common/src/fifo_cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;

use dashmap::DashMap;
use parking_lot::Mutex;

use crate::fifo_cache::ghost::GhostCache;
use crate::fifo_cache::most::MostCache;
use crate::fifo_cache::most::MainCache;
use crate::fifo_cache::small::SmallHotCache;
use crate::fifo_cache::{CacheItem, CacheKey, CacheValue};

Expand All @@ -33,28 +37,33 @@ impl<K: CacheKey, V: CacheValue> Clone for CacheHandle<K, V> {
Self { item: self.item }
}
}
pub struct FIFOCache<K: CacheKey, V: CacheValue> {
map: Arc<DashMap<K, CacheHandle<K, V>>>,
pub struct FIFOCacheShard<K: CacheKey, V: CacheValue> {
map: HashMap<K, CacheHandle<K, V>>,
small: SmallHotCache<K, V>,
main: MostCache<K, V>,
ghost: GhostCache<K>,
main: MainCache<K, V>,
ghost: GhostCache,
evict_small_times: usize,
evict_main_times: usize,

capacity: usize,
}

impl<K: CacheKey, V: CacheValue> FIFOCache<K, V> {
impl<K: CacheKey, V: CacheValue> FIFOCacheShard<K, V> {
pub fn new(capacity: usize) -> Self {
let main = MostCache::new(capacity * 9 / 10);
let main = MainCache::new(capacity * 9 / 10);
Self {
map: Arc::new(DashMap::new()),
map: HashMap::new(),
small: SmallHotCache::new(),
main,
ghost: GhostCache::new(),
evict_main_times: 0,
evict_small_times: 0,
capacity,
}
}

pub fn get(&self, k: &K) -> Option<V> {
if let Some(handle) = self.map.get(k) {
pub fn get(&mut self, k: &K) -> Option<V> {
if let Some(handle) = self.map.get_mut(k) {
unsafe {
let v = (*handle.item).value.clone();
(*handle.item).inc_freq();
Expand All @@ -68,7 +77,7 @@ impl<K: CacheKey, V: CacheValue> FIFOCache<K, V> {
self.small.size() + self.main.size()
}

pub fn clear(&self) {
pub fn clear(&mut self) {
self.small.clear();
self.main.clear();
}
Expand All @@ -81,48 +90,151 @@ impl<K: CacheKey, V: CacheValue> FIFOCache<K, V> {
self.map.contains_key(key)
}

pub fn evict(&self, ghost_capacity: usize, deleted: &mut Vec<Box<CacheItem<K, V>>>) {
pub fn evict(&mut self, ghost_capacity: usize, deleted: &mut Vec<Box<CacheItem<K, V>>>) {
if self.small.size() > self.capacity / 10 {
if let Some(item) = self.small.evict() {
if item.get_freq() > 1 {
self.main.insert(item);
} else {
self.ghost.insert(&item.key, ghost_capacity);
self.evict_small_times += 1;
self.ghost.insert(item.hash(), ghost_capacity);
self.map.remove(&item.key);
deleted.push(item);
}
}
}
if self.main.is_full() {
if let Some(item) = self.main.evict() {
self.evict_main_times += 1;
deleted.push(item);
}
}
}

pub fn insert(&self, key: K, value: V, cost: usize) {
let mut deleted = vec![];
let cache_count = self.small.count() + self.main.count();
let ghost_capacity = std::cmp::max(cache_count, self.capacity / 1000);
pub fn insert(&mut self, key: K, value: V, h: u64, cost: usize, deleted: &mut Vec<Box<CacheItem<K, V>>>) {
if let Some(handle) = self.map.get_mut(&key) {
unsafe {
(*handle.item).inc_freq();
}
return;
}
let mut to_delete = vec![];
let ghost_capacity = (self.small.count() + self.main.count()) * 5;
while self.is_full() {
self.evict(ghost_capacity, &mut deleted);
self.evict(ghost_capacity, &mut to_delete);
}
for item in &deleted {
for item in &to_delete {
self.map.remove(&item.key);
}
let is_ghost = self.ghost.is_ghost(&key);
let item = Box::new(CacheItem::new(key.clone(), value, cost));
unsafe {
let addr = Box::into_raw(item);
let handle = CacheHandle { item: addr };
if is_ghost {
self.main.insert(Box::from_raw(addr));
} else {
self.small.insert(Box::from_raw(addr));
}
self.map.insert(key.clone(), handle);
if !to_delete.is_empty() {
deleted.extend(to_delete);
}
let is_ghost = self.ghost.is_ghost(h);
let mut item = Box::new(CacheItem::new(key.clone(), value, cost));
let addr = item.as_mut();
// let addr = Box::into_raw(item);
let handle = CacheHandle { item: addr };
self.map.insert(key.clone(), handle);
if is_ghost {
self.main.insert(item);
} else {
self.small.insert(item);
}
}

fn debug_print(&mut self) -> String {
let ret = format!("evict_small_times: {} evict_main_times: {}", self.evict_small_times, self.evict_main_times);
self.evict_small_times = 0;
self.evict_main_times = 0;
ret
}
}


pub struct FIFOCache<K: CacheKey, T: CacheValue> {
shards: Vec<Mutex<FIFOCacheShard<K, T>>>,
usage_counters: Vec<Arc<AtomicUsize>>,
}

impl<K: CacheKey, T: CacheValue> FIFOCache<K, T> {
pub fn new(
num_shard_bits: usize,
capacity: usize,
) -> Self {
let num_shards = 1 << num_shard_bits;
let mut shards = Vec::with_capacity(num_shards);
let per_shard = capacity / num_shards;
let mut usage_counters = Vec::with_capacity(num_shards * 2);
for _ in 0..num_shards {
let shard = FIFOCacheShard::new(per_shard);
usage_counters.push(shard.small.get_size_counter());
usage_counters.push(shard.main.get_size_counter());
shards.push(Mutex::new(shard));
}
Self {
shards,
usage_counters
}
}
pub fn contains(self: &Arc<Self>, key: &K) -> bool {
let shard = self.shards[self.shard(key)].lock();
shard.contains(key)
}

pub fn lookup(self: &Arc<Self>, key: &K) -> Option<T> {
let mut shard = self.shards[self.shard(key)].lock();
shard.get(key)
}

fn shard(&self, key: &K) -> usize {
let mut hasher = DefaultHasher::default();
key.hash(&mut hasher);
let hash = hasher.finish();
hash as usize % self.shards.len()
}

pub fn clear(&self) {
for shard in &self.shards {
let mut shard = shard.lock();
shard.clear();
}
}

pub fn get_memory_usage(&self) -> usize {
self.usage_counters
.iter()
.map(|x| x.load(std::sync::atomic::Ordering::Acquire)).sum()
}

pub fn debug_print(&self) -> String {
let mut s = "FIFOCache: [".to_string();
for shard in &self.shards {
let mut shard = shard.lock();
s += &(shard.debug_print() + ", ");
}
s.pop();
s.pop();
s + "]"
}
pub fn insert(
self: &Arc<Self>,
key: K,
value: T,
charge: usize,
) {
let mut to_delete = vec![];
// Drop the entries outside lock to avoid deadlock.
let mut hasher = DefaultHasher::default();
key.hash(&mut hasher);
let hash = hasher.finish();
{
let mut shard = self.shards[ hash as usize % self.shards.len()].lock();
shard.insert(key, value, hash, charge, &mut to_delete);
}
}
}





29 changes: 13 additions & 16 deletions src/common/src/fifo_cache/ghost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,35 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crossbeam_queue::SegQueue;
use dashmap::DashSet;
use std::collections::{HashSet, VecDeque};

use crate::fifo_cache::CacheKey;

pub struct GhostCache<K: CacheKey> {
map: DashSet<K>,
queue: SegQueue<K>,
pub struct GhostCache {
map: HashSet<u64>,
queue: VecDeque<u64>,
}

impl<K: CacheKey> GhostCache<K> {
impl GhostCache {
pub fn new() -> Self {
Self {
map: DashSet::default(),
queue: SegQueue::new(),
map: HashSet::default(),
queue: VecDeque::new(),
}
}

pub fn insert(&self, key: &K, max_capacity: usize) {
if !self.map.insert(key.clone()) {
pub fn insert(&mut self, key_hash: u64, max_capacity: usize) {
if !self.map.insert(key_hash) {
return;
}
// avoid push fail
while self.queue.len() >= max_capacity {
if let Some(expire_key) = self.queue.pop() {
if let Some(expire_key) = self.queue.pop_front() {
self.map.remove(&expire_key);
}
}
self.queue.push(key.clone());
self.queue.push_back(key_hash);
}

pub fn is_ghost(&self, key: &K) -> bool {
self.map.contains(key)
pub fn is_ghost(&self, key: u64) -> bool {
self.map.contains(&key)
}
}
Loading

0 comments on commit b2bbaa2

Please sign in to comment.