From 34834082ba3b0a11737b6113aee3304231d88259 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 26 Feb 2024 20:03:29 +0800 Subject: [PATCH] change small ratio Signed-off-by: Little-Wallace --- src/common/src/fifo_cache/cache.rs | 26 ++++++++------ src/common/src/fifo_cache/most.rs | 22 +++++++++--- src/storage/src/hummock/block_cache.rs | 48 ++++++++++++-------------- 3 files changed, 56 insertions(+), 40 deletions(-) diff --git a/src/common/src/fifo_cache/cache.rs b/src/common/src/fifo_cache/cache.rs index 64d8b5fa33f0b..e579c96dbf6c4 100644 --- a/src/common/src/fifo_cache/cache.rs +++ b/src/common/src/fifo_cache/cache.rs @@ -21,8 +21,8 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use parking_lot::Mutex; -use tokio::sync::oneshot::{channel, Receiver, Sender}; use tokio::sync::oneshot::error::RecvError; +use tokio::sync::oneshot::{channel, Receiver, Sender}; use tokio::task::JoinHandle; use crate::fifo_cache::ghost::GhostCache; @@ -51,6 +51,8 @@ pub enum LookupResponse { } type RequestQueue = Vec>; +pub const SMALL_CACHE_RATIO_PERCENT: usize = 40; + pub struct FifoCacheShard { map: HashMap>, small: SmallHotCache, @@ -66,8 +68,9 @@ pub struct FifoCacheShard { impl FifoCacheShard { pub fn new(capacity: usize) -> Self { - let small = SmallHotCache::new(capacity / 5); - let main = MainCache::new(capacity * 4 / 5); + let small_cache_capacity = capacity * SMALL_CACHE_RATIO_PERCENT / 100; + let small = SmallHotCache::new(small_cache_capacity); + let main = MainCache::new(capacity - small_cache_capacity); Self { map: HashMap::new(), small, @@ -130,7 +133,6 @@ impl FifoCacheShard { } } - pub fn insert( &mut self, key: K, @@ -193,7 +195,7 @@ impl<'a, K: CacheKey + 'static, T: CacheValue + 'static> CleanCacheGuard<'a, K, } } -impl<'a, K: CacheKey + 'static, T: CacheValue+ 'static> Drop for CleanCacheGuard<'a, K, T> { +impl<'a, K: CacheKey + 'static, T: CacheValue + 'static> Drop for CleanCacheGuard<'a, K, T> { fn drop(&mut self) { if let Some(key) = self.key.as_ref() { self.cache.clear_pending_request(key); @@ -289,11 +291,15 @@ impl FifoCache { } } - pub fn lookup_or_insert_with(self: &Arc, key: K, fetch_value: F) -> LookupResponse - where - F: FnOnce() -> VC, - E: Error + Send + 'static + From, - VC: Future> + Send + 'static, + pub fn lookup_or_insert_with( + self: &Arc, + key: K, + fetch_value: F, + ) -> LookupResponse + where + F: FnOnce() -> VC, + E: Error + Send + 'static + From, + VC: Future> + Send + 'static, { let hash = Self::hash(&key); { diff --git a/src/common/src/fifo_cache/most.rs b/src/common/src/fifo_cache/most.rs index 560f112fc06b2..41b7a2e193dd9 100644 --- a/src/common/src/fifo_cache/most.rs +++ b/src/common/src/fifo_cache/most.rs @@ -55,14 +55,28 @@ impl MainCache { pub fn evict(&mut self) -> Option>> { let mut idx = 0; + let mut second_item = None; while let Some(mut item) = self.queue.pop_front() { - if item.dec_freq() && idx < MAX_EVICT_LOOP { - idx += 1; - self.queue.push_back(item); - } else { + if !item.dec_freq() { + if let Some(last_item) = second_item { + self.queue.push_back(last_item); + } self.cost .fetch_sub(item.cost(), std::sync::atomic::Ordering::Release); return Some(item); + } else if item.get_freq() == 0 && second_item.is_none() { + second_item = Some(item); + } else { + if idx >= MAX_EVICT_LOOP { + if second_item.is_some() { + self.queue.push_back(item); + return second_item; + } else { + return Some(item); + } + } + self.queue.push_back(item); + idx += 1; } } None diff --git a/src/storage/src/hummock/block_cache.rs b/src/storage/src/hummock/block_cache.rs index ecfb714fb9f46..8661d4777b9eb 100644 --- a/src/storage/src/hummock/block_cache.rs +++ b/src/storage/src/hummock/block_cache.rs @@ -98,20 +98,16 @@ impl BlockResponse { pub async fn wait(self) -> HummockResult { match self { BlockResponse::Block(block_holder) => Ok(block_holder), - BlockResponse::WaitPendingRequest(receiver) => { - receiver - .verbose_instrument_await("wait_pending_fetch_block") - .await - .map_err(|recv_error| recv_error.into()) - .map(BlockHolder::from_ref_block) - }, - BlockResponse::Miss(join_handle) => { - join_handle - .verbose_instrument_await("fetch_block") - .await - .unwrap() - .map(BlockHolder::from_ref_block) - }, + BlockResponse::WaitPendingRequest(receiver) => receiver + .verbose_instrument_await("wait_pending_fetch_block") + .await + .map_err(|recv_error| recv_error.into()) + .map(BlockHolder::from_ref_block), + BlockResponse::Miss(join_handle) => join_handle + .verbose_instrument_await("fetch_block") + .await + .unwrap() + .map(BlockHolder::from_ref_block), } } } @@ -190,16 +186,16 @@ impl BlockCache { Fut: Future> + Send + 'static, { let key = (object_id, block_idx); - let lookup_response = - self.inner - .lookup_or_insert_with::<_, HummockError, _>(key, || { - let f = fetch_block(); - async move { - let block = f.await?; - let len = block.capacity(); - Ok((Arc::new(block), len)) - } - }); + let lookup_response = self + .inner + .lookup_or_insert_with::<_, HummockError, _>(key, || { + let f = fetch_block(); + async move { + let block = f.await?; + let len = block.capacity(); + Ok((Arc::new(block), len)) + } + }); match lookup_response { LookupResponse::Invalid => unreachable!(), LookupResponse::Cached(entry) => { @@ -212,12 +208,12 @@ impl BlockCache { let last_miss_count = self .cache_miss_times .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - if last_miss_count % 30000 == 0 { + if last_miss_count % 10000 == 0 { let debug_info = self.inner.debug_print(); tracing::info!("cache debug info: {:?}", debug_info); } BlockResponse::Miss(join_handle) - }, + } } }