From 1c20d09a52a38f09988f1d33f119087bab13f384 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Mon, 1 Jul 2024 14:24:40 +0800 Subject: [PATCH] fix: insert disk cache on hybrid cache fetch miss cherry pick #591 Signed-off-by: MrCroxx --- foyer/Cargo.toml | 1 + foyer/src/hybrid/cache.rs | 82 ++++++++++++++++++++++++++++++++++++--- foyer/src/prelude.rs | 2 +- 3 files changed, 79 insertions(+), 6 deletions(-) diff --git a/foyer/Cargo.toml b/foyer/Cargo.toml index 97bbc31f..b5963a65 100644 --- a/foyer/Cargo.toml +++ b/foyer/Cargo.toml @@ -17,6 +17,7 @@ anyhow = "1" foyer-common = { version = "0.7.3", path = "../foyer-common" } foyer-memory = { version = "0.5.2", path = "../foyer-memory" } foyer-storage = { version = "0.8.5", path = "../foyer-storage" } +pin-project = "1" tokio = { workspace = true } tracing = "0.1" diff --git a/foyer/src/hybrid/cache.rs b/foyer/src/hybrid/cache.rs index e85a6402..ffc02fe6 100644 --- a/foyer/src/hybrid/cache.rs +++ b/foyer/src/hybrid/cache.rs @@ -12,7 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{borrow::Borrow, fmt::Debug, future::Future, hash::Hash, sync::Arc, time::Instant}; +use std::{ + borrow::Borrow, + fmt::Debug, + future::Future, + hash::Hash, + ops::Deref, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + task::{ready, Context, Poll}, + time::Instant, +}; use ahash::RandomState; use foyer_common::{ @@ -21,6 +34,7 @@ use foyer_common::{ }; use foyer_memory::{Cache, CacheContext, CacheEntry, Fetch, FetchState}; use foyer_storage::{DeviceStats, Storage, Store}; +use pin_project::pin_project; use tokio::sync::oneshot; use crate::HybridCacheWriter; @@ -273,7 +287,57 @@ impl From for ObtainFetchError { } /// The future generated by [`HybridCache::fetch`]. -pub type HybridFetch = Fetch; +pub type HybridFetch = HybridFetchInner; + +/// A future that is used to get entry value from the remote storage for the hybrid cache. +#[pin_project] +pub struct HybridFetchInner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + #[pin] + inner: Fetch, + + enqueue: Arc, + storage: Store, +} + +impl Future for HybridFetchInner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + type Output = anyhow::Result>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let res = ready!(this.inner.poll(cx)); + + if let Ok(entry) = res.as_ref() { + if this.enqueue.load(Ordering::Relaxed) { + this.storage.enqueue(entry.clone(), false); + } + } + + Poll::Ready(res) + } +} + +impl Deref for HybridFetchInner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + type Target = Fetch; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} impl HybridCache where @@ -293,11 +357,13 @@ where let now = Instant::now(); let store = self.storage.clone(); + let enqueue = Arc::::default(); let future = fetch(); - let ret = self.memory.fetch_with_runtime( + let inner = self.memory.fetch_with_runtime( key.clone(), || { let metrics = self.metrics.clone(); + let enqueue = enqueue.clone(); async move { match store.load(&key).await.map_err(anyhow::Error::from)? { None => {} @@ -313,18 +379,24 @@ where metrics.hybrid_miss.increment(1); metrics.hybrid_miss_duration.record(now.elapsed()); + enqueue.store(true, Ordering::Relaxed); + future.await.map_err(anyhow::Error::from) } }, self.storage().runtime(), ); - if ret.state() == FetchState::Hit { + if inner.state() == FetchState::Hit { self.metrics.hybrid_hit.increment(1); self.metrics.hybrid_hit_duration.record(now.elapsed()); } - ret + HybridFetchInner { + inner, + enqueue, + storage: self.storage.clone(), + } } } diff --git a/foyer/src/prelude.rs b/foyer/src/prelude.rs index 8ff51aaf..0245d442 100644 --- a/foyer/src/prelude.rs +++ b/foyer/src/prelude.rs @@ -35,7 +35,7 @@ pub use storage::{ pub use crate::hybrid::{ builder::{HybridCacheBuilder, HybridCacheBuilderPhaseMemory, HybridCacheBuilderPhaseStorage}, - cache::{HybridCache, HybridCacheEntry, HybridFetch}, + cache::{HybridCache, HybridCacheEntry, HybridFetch, HybridFetchInner}, writer::{HybridCacheStorageWriter, HybridCacheWriter}, };