Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: insert disk cache on hybrid cache fetch miss #591

Merged
merged 3 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions foyer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ foyer-common = { version = "0.7.3", path = "../foyer-common" }
foyer-memory = { version = "0.5.2", path = "../foyer-memory" }
foyer-storage = { version = "0.8.5", path = "../foyer-storage" }
minitrace = { workspace = true }
pin-project = "1"
tokio = { workspace = true }
tracing = "0.1"

Expand Down
72 changes: 68 additions & 4 deletions foyer/src/hybrid/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
fmt::Debug,
future::Future,
hash::Hash,
ops::Deref,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{ready, Context, Poll},
time::Instant,
};

Expand All @@ -33,6 +36,7 @@
use foyer_memory::{Cache, CacheContext, CacheEntry, Fetch, FetchState};
use foyer_storage::{DeviceStats, Storage, Store};
use minitrace::prelude::*;
use pin_project::pin_project;
use tokio::sync::oneshot;

use crate::HybridCacheWriter;
Expand Down Expand Up @@ -387,7 +391,57 @@
}

/// The future generated by [`HybridCache::fetch`].
pub type HybridFetch<K, V, S = RandomState> = InRootSpan<Fetch<K, V, anyhow::Error, S>>;
pub type HybridFetch<K, V, S = RandomState> = InRootSpan<HybridFetchInner<K, V, S>>;

/// A future that is used to get entry value from the remote storage for the hybrid cache.
#[pin_project]
pub struct HybridFetchInner<K, V, S = RandomState>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
#[pin]
inner: Fetch<K, V, anyhow::Error, S>,

enqueue: Arc<AtomicBool>,
storage: Store<K, V, S>,
}

impl<K, V, S> Future for HybridFetchInner<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
type Output = anyhow::Result<CacheEntry<K, V, S>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = ready!(this.inner.poll(cx));

if let Ok(entry) = res.as_ref() {
if this.enqueue.load(Ordering::Relaxed) {
this.storage.enqueue(entry.clone(), false);
}
}

Check warning on line 427 in foyer/src/hybrid/cache.rs

View check run for this annotation

Codecov / codecov/patch

foyer/src/hybrid/cache.rs#L427

Added line #L427 was not covered by tests

Poll::Ready(res)
}
}

impl<K, V, S> Deref for HybridFetchInner<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
type Target = Fetch<K, V, anyhow::Error, S>;

fn deref(&self) -> &Self::Target {
&self.inner
}

Check warning on line 443 in foyer/src/hybrid/cache.rs

View check run for this annotation

Codecov / codecov/patch

foyer/src/hybrid/cache.rs#L441-L443

Added lines #L441 - L443 were not covered by tests
}

impl<K, V, S> HybridCache<K, V, S>
where
Expand All @@ -411,11 +465,13 @@
let now = Instant::now();

let store = self.storage.clone();
let enqueue = Arc::<AtomicBool>::default();
let future = fetch();
let res = self.memory.fetch_with_runtime(
let inner = self.memory.fetch_with_runtime(
key.clone(),
|| {
let metrics = self.metrics.clone();
let enqueue = enqueue.clone();
async move {
match store.load(&key).await.map_err(anyhow::Error::from)? {
None => {}
Expand All @@ -431,6 +487,8 @@
metrics.hybrid_miss.increment(1);
metrics.hybrid_miss_duration.record(now.elapsed());

enqueue.store(true, Ordering::Relaxed);

future
.in_span(Span::enter_with_local_parent("foyer::hybrid::fetch::fn"))
.await
Expand All @@ -440,12 +498,18 @@
self.storage().runtime(),
);

if res.state() == FetchState::Hit {
if inner.state() == FetchState::Hit {
self.metrics.hybrid_hit.increment(1);
self.metrics.hybrid_hit_duration.record(now.elapsed());
}

InRootSpan::new(res, span).with_threshold(self.tracing_config.record_hybrid_fetch_threshold())
let inner = HybridFetchInner {
inner,
enqueue,
storage: self.storage.clone(),
};

InRootSpan::new(inner, span).with_threshold(self.tracing_config.record_hybrid_fetch_threshold())
}
}

Expand Down
2 changes: 1 addition & 1 deletion foyer/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub use storage::{

pub use crate::hybrid::{
builder::{HybridCacheBuilder, HybridCacheBuilderPhaseMemory, HybridCacheBuilderPhaseStorage},
cache::{HybridCache, HybridCacheEntry, HybridFetch},
cache::{HybridCache, HybridCacheEntry, HybridFetchInner},
writer::{HybridCacheStorageWriter, HybridCacheWriter},
};

Expand Down
Loading