Skip to content

Commit

Permalink
fix: insert disk cache on hybrid cache fetch miss (#591)
Browse files Browse the repository at this point in the history
* fix: insert disk cache on hybrid cache fetch miss

Signed-off-by: MrCroxx <[email protected]>

* refactor: export HybridFetch

Signed-off-by: MrCroxx <[email protected]>

* fix: fix atomic memory order

Signed-off-by: MrCroxx <[email protected]>

---------

Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Jul 1, 2024
1 parent 430e536 commit 62ae5e7
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 5 deletions.
1 change: 1 addition & 0 deletions foyer-storage/src/large/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ where
self.inner.admission_picker.pick(&self.inner.stats, key)
}

#[minitrace::trace(name = "foyer::storage::large::generic::enqueue")]
fn enqueue(&self, entry: CacheEntry<K, V, S>, force: bool) -> EnqueueHandle {
let now = Instant::now();

Expand Down
1 change: 1 addition & 0 deletions foyer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ foyer-common = { version = "0.7.3", path = "../foyer-common" }
foyer-memory = { version = "0.5.2", path = "../foyer-memory" }
foyer-storage = { version = "0.8.5", path = "../foyer-storage" }
minitrace = { workspace = true }
pin-project = "1"
tokio = { workspace = true }
tracing = "0.1"

Expand Down
72 changes: 68 additions & 4 deletions foyer/src/hybrid/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ use std::{
fmt::Debug,
future::Future,
hash::Hash,
ops::Deref,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{ready, Context, Poll},
time::Instant,
};

Expand All @@ -33,6 +36,7 @@ use foyer_common::{
use foyer_memory::{Cache, CacheContext, CacheEntry, Fetch, FetchState};
use foyer_storage::{DeviceStats, Storage, Store};
use minitrace::prelude::*;
use pin_project::pin_project;
use tokio::sync::oneshot;

use crate::HybridCacheWriter;
Expand Down Expand Up @@ -387,7 +391,57 @@ impl From<oneshot::error::RecvError> for ObtainFetchError {
}

/// The future generated by [`HybridCache::fetch`].
pub type HybridFetch<K, V, S = RandomState> = InRootSpan<Fetch<K, V, anyhow::Error, S>>;
pub type HybridFetch<K, V, S = RandomState> = InRootSpan<HybridFetchInner<K, V, S>>;

/// A future that is used to get entry value from the remote storage for the hybrid cache.
#[pin_project]
pub struct HybridFetchInner<K, V, S = RandomState>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
#[pin]
inner: Fetch<K, V, anyhow::Error, S>,

enqueue: Arc<AtomicBool>,
storage: Store<K, V, S>,
}

impl<K, V, S> Future for HybridFetchInner<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
type Output = anyhow::Result<CacheEntry<K, V, S>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = ready!(this.inner.poll(cx));

if let Ok(entry) = res.as_ref() {
if this.enqueue.load(Ordering::Acquire) {
this.storage.enqueue(entry.clone(), false);
}
}

Poll::Ready(res)
}
}

impl<K, V, S> Deref for HybridFetchInner<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
type Target = Fetch<K, V, anyhow::Error, S>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<K, V, S> HybridCache<K, V, S>
where
Expand All @@ -411,11 +465,13 @@ where
let now = Instant::now();

let store = self.storage.clone();
let enqueue = Arc::<AtomicBool>::default();
let future = fetch();
let res = self.memory.fetch_with_runtime(
let inner = self.memory.fetch_with_runtime(
key.clone(),
|| {
let metrics = self.metrics.clone();
let enqueue = enqueue.clone();
async move {
match store.load(&key).await.map_err(anyhow::Error::from)? {
None => {}
Expand All @@ -431,6 +487,8 @@ where
metrics.hybrid_miss.increment(1);
metrics.hybrid_miss_duration.record(now.elapsed());

enqueue.store(true, Ordering::Release);

future
.in_span(Span::enter_with_local_parent("foyer::hybrid::fetch::fn"))
.await
Expand All @@ -440,12 +498,18 @@ where
self.storage().runtime(),
);

if res.state() == FetchState::Hit {
if inner.state() == FetchState::Hit {
self.metrics.hybrid_hit.increment(1);
self.metrics.hybrid_hit_duration.record(now.elapsed());
}

InRootSpan::new(res, span).with_threshold(self.tracing_config.record_hybrid_fetch_threshold())
let inner = HybridFetchInner {
inner,
enqueue,
storage: self.storage.clone(),
};

InRootSpan::new(inner, span).with_threshold(self.tracing_config.record_hybrid_fetch_threshold())
}
}

Expand Down
2 changes: 1 addition & 1 deletion foyer/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub use storage::{

pub use crate::hybrid::{
builder::{HybridCacheBuilder, HybridCacheBuilderPhaseMemory, HybridCacheBuilderPhaseStorage},
cache::{HybridCache, HybridCacheEntry, HybridFetch},
cache::{HybridCache, HybridCacheEntry, HybridFetch, HybridFetchInner},
writer::{HybridCacheStorageWriter, HybridCacheWriter},
};

Expand Down

0 comments on commit 62ae5e7

Please sign in to comment.