From 1137d3ce6345f456127ace929a031f9a509297da Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Mon, 1 Jul 2024 14:24:40 +0800 Subject: [PATCH 1/3] fix: insert disk cache on hybrid cache fetch miss Signed-off-by: MrCroxx --- foyer/Cargo.toml | 1 + foyer/src/hybrid/cache.rs | 72 ++++++++++++++++++++++++++++++++++++--- foyer/src/prelude.rs | 2 +- 3 files changed, 70 insertions(+), 5 deletions(-) diff --git a/foyer/Cargo.toml b/foyer/Cargo.toml index 355453a5..1af37c0b 100644 --- a/foyer/Cargo.toml +++ b/foyer/Cargo.toml @@ -18,6 +18,7 @@ foyer-common = { version = "0.7.3", path = "../foyer-common" } foyer-memory = { version = "0.5.2", path = "../foyer-memory" } foyer-storage = { version = "0.8.5", path = "../foyer-storage" } minitrace = { workspace = true } +pin-project = "1" tokio = { workspace = true } tracing = "0.1" diff --git a/foyer/src/hybrid/cache.rs b/foyer/src/hybrid/cache.rs index fc3c4768..63bbd7b9 100644 --- a/foyer/src/hybrid/cache.rs +++ b/foyer/src/hybrid/cache.rs @@ -17,10 +17,13 @@ use std::{ fmt::Debug, future::Future, hash::Hash, + ops::Deref, + pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, + task::{ready, Context, Poll}, time::Instant, }; @@ -33,6 +36,7 @@ use foyer_common::{ use foyer_memory::{Cache, CacheContext, CacheEntry, Fetch, FetchState}; use foyer_storage::{DeviceStats, Storage, Store}; use minitrace::prelude::*; +use pin_project::pin_project; use tokio::sync::oneshot; use crate::HybridCacheWriter; @@ -387,7 +391,57 @@ impl From for ObtainFetchError { } /// The future generated by [`HybridCache::fetch`]. -pub type HybridFetch = InRootSpan>; +pub type HybridFetch = InRootSpan>; + +/// A future that is used to get entry value from the remote storage for the hybrid cache. +#[pin_project] +pub struct HybridFetchInner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + #[pin] + inner: Fetch, + + enqueue: Arc, + storage: Store, +} + +impl Future for HybridFetchInner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + type Output = anyhow::Result>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let res = ready!(this.inner.poll(cx)); + + if let Ok(entry) = res.as_ref() { + if this.enqueue.load(Ordering::Relaxed) { + this.storage.enqueue(entry.clone(), false); + } + } + + Poll::Ready(res) + } +} + +impl Deref for HybridFetchInner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + type Target = Fetch; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} impl HybridCache where @@ -411,11 +465,13 @@ where let now = Instant::now(); let store = self.storage.clone(); + let enqueue = Arc::::default(); let future = fetch(); - let res = self.memory.fetch_with_runtime( + let inner = self.memory.fetch_with_runtime( key.clone(), || { let metrics = self.metrics.clone(); + let enqueue = enqueue.clone(); async move { match store.load(&key).await.map_err(anyhow::Error::from)? { None => {} @@ -431,6 +487,8 @@ where metrics.hybrid_miss.increment(1); metrics.hybrid_miss_duration.record(now.elapsed()); + enqueue.store(true, Ordering::Relaxed); + future .in_span(Span::enter_with_local_parent("foyer::hybrid::fetch::fn")) .await @@ -440,12 +498,18 @@ where self.storage().runtime(), ); - if res.state() == FetchState::Hit { + if inner.state() == FetchState::Hit { self.metrics.hybrid_hit.increment(1); self.metrics.hybrid_hit_duration.record(now.elapsed()); } - InRootSpan::new(res, span).with_threshold(self.tracing_config.record_hybrid_fetch_threshold()) + let inner = HybridFetchInner { + inner, + enqueue, + storage: self.storage.clone(), + }; + + InRootSpan::new(inner, span).with_threshold(self.tracing_config.record_hybrid_fetch_threshold()) } } diff --git a/foyer/src/prelude.rs b/foyer/src/prelude.rs index fe59e1ab..803b8de7 100644 --- a/foyer/src/prelude.rs +++ b/foyer/src/prelude.rs @@ -36,7 +36,7 @@ pub use storage::{ pub use crate::hybrid::{ builder::{HybridCacheBuilder, HybridCacheBuilderPhaseMemory, HybridCacheBuilderPhaseStorage}, - cache::{HybridCache, HybridCacheEntry, HybridFetch}, + cache::{HybridCache, HybridCacheEntry, HybridFetchInner}, writer::{HybridCacheStorageWriter, HybridCacheWriter}, }; From 2226f1640ebd15f7ce219af01a0dc749f43369f5 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Mon, 1 Jul 2024 14:46:36 +0800 Subject: [PATCH 2/3] refactor: export HybridFetch Signed-off-by: MrCroxx --- foyer/src/prelude.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/foyer/src/prelude.rs b/foyer/src/prelude.rs index 803b8de7..889d180e 100644 --- a/foyer/src/prelude.rs +++ b/foyer/src/prelude.rs @@ -36,7 +36,7 @@ pub use storage::{ pub use crate::hybrid::{ builder::{HybridCacheBuilder, HybridCacheBuilderPhaseMemory, HybridCacheBuilderPhaseStorage}, - cache::{HybridCache, HybridCacheEntry, HybridFetchInner}, + cache::{HybridCache, HybridCacheEntry, HybridFetch, HybridFetchInner}, writer::{HybridCacheStorageWriter, HybridCacheWriter}, }; From 96c5b37b9e92a03e83cf46cdacf9551c00fb948c Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Mon, 1 Jul 2024 18:43:13 +0800 Subject: [PATCH 3/3] fix: fix atomic memory order Signed-off-by: MrCroxx --- foyer-storage/src/large/generic.rs | 1 + foyer/src/hybrid/cache.rs | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/foyer-storage/src/large/generic.rs b/foyer-storage/src/large/generic.rs index e15497b4..41ffdfe1 100644 --- a/foyer-storage/src/large/generic.rs +++ b/foyer-storage/src/large/generic.rs @@ -321,6 +321,7 @@ where self.inner.admission_picker.pick(&self.inner.stats, key) } + #[minitrace::trace(name = "foyer::storage::large::generic::enqueue")] fn enqueue(&self, entry: CacheEntry, force: bool) -> EnqueueHandle { let now = Instant::now(); diff --git a/foyer/src/hybrid/cache.rs b/foyer/src/hybrid/cache.rs index 63bbd7b9..609dc189 100644 --- a/foyer/src/hybrid/cache.rs +++ b/foyer/src/hybrid/cache.rs @@ -421,7 +421,7 @@ where let res = ready!(this.inner.poll(cx)); if let Ok(entry) = res.as_ref() { - if this.enqueue.load(Ordering::Relaxed) { + if this.enqueue.load(Ordering::Acquire) { this.storage.enqueue(entry.clone(), false); } } @@ -487,7 +487,7 @@ where metrics.hybrid_miss.increment(1); metrics.hybrid_miss_duration.record(now.elapsed()); - enqueue.store(true, Ordering::Relaxed); + enqueue.store(true, Ordering::Release); future .in_span(Span::enter_with_local_parent("foyer::hybrid::fetch::fn"))