Skip to content

Commit

Permalink
change small ratio
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Feb 26, 2024
1 parent 68ba32a commit 3483408
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 40 deletions.
26 changes: 16 additions & 10 deletions src/common/src/fifo_cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use parking_lot::Mutex;
use tokio::sync::oneshot::{channel, Receiver, Sender};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::oneshot::{channel, Receiver, Sender};
use tokio::task::JoinHandle;

use crate::fifo_cache::ghost::GhostCache;
Expand Down Expand Up @@ -51,6 +51,8 @@ pub enum LookupResponse<T: CacheValue + 'static, E> {
}
type RequestQueue<T> = Vec<Sender<T>>;

pub const SMALL_CACHE_RATIO_PERCENT: usize = 40;

pub struct FifoCacheShard<K: CacheKey, V: CacheValue> {
map: HashMap<K, CacheHandle<K, V>>,
small: SmallHotCache<K, V>,
Expand All @@ -66,8 +68,9 @@ pub struct FifoCacheShard<K: CacheKey, V: CacheValue> {

impl<K: CacheKey, V: CacheValue> FifoCacheShard<K, V> {
pub fn new(capacity: usize) -> Self {
let small = SmallHotCache::new(capacity / 5);
let main = MainCache::new(capacity * 4 / 5);
let small_cache_capacity = capacity * SMALL_CACHE_RATIO_PERCENT / 100;
let small = SmallHotCache::new(small_cache_capacity);
let main = MainCache::new(capacity - small_cache_capacity);
Self {
map: HashMap::new(),
small,
Expand Down Expand Up @@ -130,7 +133,6 @@ impl<K: CacheKey, V: CacheValue> FifoCacheShard<K, V> {
}
}


pub fn insert(
&mut self,
key: K,
Expand Down Expand Up @@ -193,7 +195,7 @@ impl<'a, K: CacheKey + 'static, T: CacheValue + 'static> CleanCacheGuard<'a, K,
}
}

impl<'a, K: CacheKey + 'static, T: CacheValue+ 'static> Drop for CleanCacheGuard<'a, K, T> {
impl<'a, K: CacheKey + 'static, T: CacheValue + 'static> Drop for CleanCacheGuard<'a, K, T> {
fn drop(&mut self) {
if let Some(key) = self.key.as_ref() {
self.cache.clear_pending_request(key);
Expand Down Expand Up @@ -289,11 +291,15 @@ impl<K: CacheKey + 'static, V: CacheValue + 'static> FifoCache<K, V> {
}
}

pub fn lookup_or_insert_with<F, E, VC>(self: &Arc<Self>, key: K, fetch_value: F) -> LookupResponse<V, E>
where
F: FnOnce() -> VC,
E: Error + Send + 'static + From<RecvError>,
VC: Future<Output = Result<(V, usize), E>> + Send + 'static,
pub fn lookup_or_insert_with<F, E, VC>(
self: &Arc<Self>,
key: K,
fetch_value: F,
) -> LookupResponse<V, E>
where
F: FnOnce() -> VC,
E: Error + Send + 'static + From<RecvError>,
VC: Future<Output = Result<(V, usize), E>> + Send + 'static,
{
let hash = Self::hash(&key);
{
Expand Down
22 changes: 18 additions & 4 deletions src/common/src/fifo_cache/most.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,28 @@ impl<K: CacheKey, V: CacheValue> MainCache<K, V> {

pub fn evict(&mut self) -> Option<Box<CacheItem<K, V>>> {
let mut idx = 0;
let mut second_item = None;
while let Some(mut item) = self.queue.pop_front() {
if item.dec_freq() && idx < MAX_EVICT_LOOP {
idx += 1;
self.queue.push_back(item);
} else {
if !item.dec_freq() {
if let Some(last_item) = second_item {
self.queue.push_back(last_item);
}
self.cost
.fetch_sub(item.cost(), std::sync::atomic::Ordering::Release);
return Some(item);
} else if item.get_freq() == 0 && second_item.is_none() {
second_item = Some(item);
} else {
if idx >= MAX_EVICT_LOOP {
if second_item.is_some() {
self.queue.push_back(item);
return second_item;
} else {
return Some(item);
}
}
self.queue.push_back(item);
idx += 1;
}
}
None
Expand Down
48 changes: 22 additions & 26 deletions src/storage/src/hummock/block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,16 @@ impl BlockResponse {
pub async fn wait(self) -> HummockResult<BlockHolder> {
match self {
BlockResponse::Block(block_holder) => Ok(block_holder),
BlockResponse::WaitPendingRequest(receiver) => {
receiver
.verbose_instrument_await("wait_pending_fetch_block")
.await
.map_err(|recv_error| recv_error.into())
.map(BlockHolder::from_ref_block)
},
BlockResponse::Miss(join_handle) => {
join_handle
.verbose_instrument_await("fetch_block")
.await
.unwrap()
.map(BlockHolder::from_ref_block)
},
BlockResponse::WaitPendingRequest(receiver) => receiver
.verbose_instrument_await("wait_pending_fetch_block")
.await
.map_err(|recv_error| recv_error.into())
.map(BlockHolder::from_ref_block),
BlockResponse::Miss(join_handle) => join_handle
.verbose_instrument_await("fetch_block")
.await
.unwrap()
.map(BlockHolder::from_ref_block),
}
}
}
Expand Down Expand Up @@ -190,16 +186,16 @@ impl BlockCache {
Fut: Future<Output = HummockResult<Block>> + Send + 'static,
{
let key = (object_id, block_idx);
let lookup_response =
self.inner
.lookup_or_insert_with::<_, HummockError, _>(key, || {
let f = fetch_block();
async move {
let block = f.await?;
let len = block.capacity();
Ok((Arc::new(block), len))
}
});
let lookup_response = self
.inner
.lookup_or_insert_with::<_, HummockError, _>(key, || {
let f = fetch_block();
async move {
let block = f.await?;
let len = block.capacity();
Ok((Arc::new(block), len))
}
});
match lookup_response {
LookupResponse::Invalid => unreachable!(),
LookupResponse::Cached(entry) => {
Expand All @@ -212,12 +208,12 @@ impl BlockCache {
let last_miss_count = self
.cache_miss_times
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if last_miss_count % 30000 == 0 {
if last_miss_count % 10000 == 0 {
let debug_info = self.inner.debug_print();
tracing::info!("cache debug info: {:?}", debug_info);
}
BlockResponse::Miss(join_handle)
},
}
}
}

Expand Down

0 comments on commit 3483408

Please sign in to comment.