diff --git a/foyer-storage/src/large/generic.rs b/foyer-storage/src/large/generic.rs index e15497b4..41ffdfe1 100644 --- a/foyer-storage/src/large/generic.rs +++ b/foyer-storage/src/large/generic.rs @@ -321,6 +321,7 @@ where self.inner.admission_picker.pick(&self.inner.stats, key) } + #[minitrace::trace(name = "foyer::storage::large::generic::enqueue")] fn enqueue(&self, entry: CacheEntry, force: bool) -> EnqueueHandle { let now = Instant::now(); diff --git a/foyer/Cargo.toml b/foyer/Cargo.toml index 355453a5..1af37c0b 100644 --- a/foyer/Cargo.toml +++ b/foyer/Cargo.toml @@ -18,6 +18,7 @@ foyer-common = { version = "0.7.3", path = "../foyer-common" } foyer-memory = { version = "0.5.2", path = "../foyer-memory" } foyer-storage = { version = "0.8.5", path = "../foyer-storage" } minitrace = { workspace = true } +pin-project = "1" tokio = { workspace = true } tracing = "0.1" diff --git a/foyer/src/hybrid/cache.rs b/foyer/src/hybrid/cache.rs index fc3c4768..609dc189 100644 --- a/foyer/src/hybrid/cache.rs +++ b/foyer/src/hybrid/cache.rs @@ -17,10 +17,13 @@ use std::{ fmt::Debug, future::Future, hash::Hash, + ops::Deref, + pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, + task::{ready, Context, Poll}, time::Instant, }; @@ -33,6 +36,7 @@ use foyer_common::{ use foyer_memory::{Cache, CacheContext, CacheEntry, Fetch, FetchState}; use foyer_storage::{DeviceStats, Storage, Store}; use minitrace::prelude::*; +use pin_project::pin_project; use tokio::sync::oneshot; use crate::HybridCacheWriter; @@ -387,7 +391,57 @@ impl From for ObtainFetchError { } /// The future generated by [`HybridCache::fetch`]. -pub type HybridFetch = InRootSpan>; +pub type HybridFetch = InRootSpan>; + +/// A future that is used to get entry value from the remote storage for the hybrid cache. +#[pin_project] +pub struct HybridFetchInner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + #[pin] + inner: Fetch, + + enqueue: Arc, + storage: Store, +} + +impl Future for HybridFetchInner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + type Output = anyhow::Result>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let res = ready!(this.inner.poll(cx)); + + if let Ok(entry) = res.as_ref() { + if this.enqueue.load(Ordering::Acquire) { + this.storage.enqueue(entry.clone(), false); + } + } + + Poll::Ready(res) + } +} + +impl Deref for HybridFetchInner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + type Target = Fetch; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} impl HybridCache where @@ -411,11 +465,13 @@ where let now = Instant::now(); let store = self.storage.clone(); + let enqueue = Arc::::default(); let future = fetch(); - let res = self.memory.fetch_with_runtime( + let inner = self.memory.fetch_with_runtime( key.clone(), || { let metrics = self.metrics.clone(); + let enqueue = enqueue.clone(); async move { match store.load(&key).await.map_err(anyhow::Error::from)? { None => {} @@ -431,6 +487,8 @@ where metrics.hybrid_miss.increment(1); metrics.hybrid_miss_duration.record(now.elapsed()); + enqueue.store(true, Ordering::Release); + future .in_span(Span::enter_with_local_parent("foyer::hybrid::fetch::fn")) .await @@ -440,12 +498,18 @@ where self.storage().runtime(), ); - if res.state() == FetchState::Hit { + if inner.state() == FetchState::Hit { self.metrics.hybrid_hit.increment(1); self.metrics.hybrid_hit_duration.record(now.elapsed()); } - InRootSpan::new(res, span).with_threshold(self.tracing_config.record_hybrid_fetch_threshold()) + let inner = HybridFetchInner { + inner, + enqueue, + storage: self.storage.clone(), + }; + + InRootSpan::new(inner, span).with_threshold(self.tracing_config.record_hybrid_fetch_threshold()) } } diff --git a/foyer/src/prelude.rs b/foyer/src/prelude.rs index fe59e1ab..889d180e 100644 --- a/foyer/src/prelude.rs +++ b/foyer/src/prelude.rs @@ -36,7 +36,7 @@ pub use storage::{ pub use crate::hybrid::{ builder::{HybridCacheBuilder, HybridCacheBuilderPhaseMemory, HybridCacheBuilderPhaseStorage}, - cache::{HybridCache, HybridCacheEntry, HybridFetch}, + cache::{HybridCache, HybridCacheEntry, HybridFetch, HybridFetchInner}, writer::{HybridCacheStorageWriter, HybridCacheWriter}, };