Skip to content

Commit

Permalink
fix(cache): fix entry drop in channel (risingwavelabs#6634)
Browse files Browse the repository at this point in the history
* fix drop

Signed-off-by: Little-Wallace <[email protected]>

* fix warn

Signed-off-by: Little-Wallace <[email protected]>

Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored Nov 29, 2022
1 parent a061017 commit c2464af
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 43 deletions.
44 changes: 18 additions & 26 deletions src/common/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,15 @@ impl<K: LruKey, T: LruValue> LruHandleTable<K, T> {
}
}

type RequestQueue<K, T> = Vec<Sender<CacheableEntry<K, T>>>;
type RequestQueue = Vec<Sender<()>>;
pub struct LruCacheShard<K: LruKey, T: LruValue> {
/// The dummy header node of a ring linked list. The linked list is a LRU list, holding the
/// cache handles that are not used externally.
lru: Box<LruHandle<K, T>>,
table: LruHandleTable<K, T>,
// TODO: may want to use an atomic object linked list shared by all shards.
object_pool: Vec<Box<LruHandle<K, T>>>,
write_request: HashMap<K, RequestQueue<K, T>>,
write_request: HashMap<K, RequestQueue>,
lru_usage: Arc<AtomicUsize>,
usage: Arc<AtomicUsize>,
capacity: usize,
Expand Down Expand Up @@ -680,21 +680,14 @@ impl<K: LruKey, T: LruValue> LruCache<K, T> {
) -> CacheableEntry<K, T> {
let mut to_delete = vec![];
// Drop the entries outside lock to avoid deadlock.
let mut errs = vec![];
let handle = unsafe {
let mut shard = self.shards[self.shard(hash)].lock();
let pending_request = shard.write_request.remove(&key);
let ptr = shard.insert(key, hash, charge, value, &mut to_delete);
debug_assert!(!ptr.is_null());
if let Some(que) = pending_request {
for sender in que {
(*ptr).add_ref();
if let Err(e) = sender.send(CacheableEntry {
cache: self.clone(),
handle: ptr,
}) {
errs.push(e);
}
let _ = sender.send(());
}
}
CacheableEntry {
Expand Down Expand Up @@ -808,9 +801,8 @@ impl<K: LruKey + Clone + 'static, T: LruValue + 'static> LruCache<K, T> {
match self.lookup_for_request(hash, key.clone()) {
LookupResult::Cached(entry) => return Ok(entry),
LookupResult::WaitPendingRequest(recv) => {
if let Ok(entry) = recv.await {
return Ok(entry);
}
let _ = recv.await;
continue;
}
LookupResult::Miss => {
let this = self.clone();
Expand Down Expand Up @@ -847,7 +839,7 @@ pub struct CacheableEntry<K: LruKey, T: LruValue> {
pub enum LookupResult<K: LruKey, T: LruValue> {
Cached(CacheableEntry<K, T>),
Miss,
WaitPendingRequest(Receiver<CacheableEntry<K, T>>),
WaitPendingRequest(Receiver<()>),
}

unsafe impl<K: LruKey, T: LruValue> Send for CacheableEntry<K, T> {}
Expand Down Expand Up @@ -1184,23 +1176,23 @@ mod tests {
insert(&mut shard, "a", "v1");
assert!(lookup(&mut shard, "a"));
}
let ret = cache.lookup_for_request(0, "a".to_string());
match ret {
LookupResult::Cached(_) => (),
_ => panic!(),
}
let ret1 = cache.lookup_for_request(0, "b".to_string());
match ret1 {
LookupResult::Miss => (),
_ => panic!(),
}
assert!(matches!(
cache.lookup_for_request(0, "a".to_string()),
LookupResult::Cached(_)
));
assert!(matches!(
cache.lookup_for_request(0, "b".to_string()),
LookupResult::Miss
));
let ret2 = cache.lookup_for_request(0, "b".to_string());
match ret2 {
LookupResult::WaitPendingRequest(mut recv) => {
assert!(matches!(recv.try_recv(), Err(TryRecvError::Empty)));
cache.insert("b".to_string(), 0, 1, "v2".to_string());
let v = recv.try_recv().unwrap();
assert_eq!(v.value(), "v2");
recv.try_recv().unwrap();
assert!(
matches!(cache.lookup_for_request(0, "b".to_string()), LookupResult::Cached(v) if v.value().eq("v2"))
);
}
_ => panic!(),
}
Expand Down
28 changes: 11 additions & 17 deletions src/storage/benches/bench_lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use moka::future::Cache;
use rand::rngs::SmallRng;
use rand::{RngCore, SeedableRng};
use risingwave_storage::hummock::{HummockError, HummockResult, LookupResult, LruCache};
use risingwave_storage::hummock::{HummockError, HummockResult, LruCache};
use tokio::runtime::{Builder, Runtime};

pub struct Block {
Expand Down Expand Up @@ -101,22 +101,16 @@ impl CacheBase for LruCacheImpl {
sst_id.hash(&mut hasher);
block_idx.hash(&mut hasher);
let h = hasher.finish();
match self.inner.lookup_for_request(h, key) {
LookupResult::Cached(entry) => {
let block = entry.value().clone();
Ok(block)
}
LookupResult::WaitPendingRequest(recv) => {
let entry = recv.await.map_err(HummockError::other)?;
Ok(entry.value().clone())
}
LookupResult::Miss => {
let block =
Arc::new(get_fake_block(sst_id, block_idx, self.fake_io_latency).await?);
self.inner.insert(key, h, 1, block.clone());
Ok(block)
}
}
let latency = self.fake_io_latency;
let entry = self
.inner
.lookup_with_request_dedup(h, key, || async move {
get_fake_block(sst_id, block_idx, latency)
.await
.map(|block| (Arc::new(block), 1))
})
.await?;
Ok(entry.value().clone())
}
}

Expand Down

0 comments on commit c2464af

Please sign in to comment.