Skip to content

Commit

Permalink
fix: insert disk cache on hybrid cache fetch miss
Browse files Browse the repository at this point in the history
cherry pick #591

Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx committed Jul 1, 2024
1 parent 30d87be commit 1c20d09
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 6 deletions.
1 change: 1 addition & 0 deletions foyer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ anyhow = "1"
foyer-common = { version = "0.7.3", path = "../foyer-common" }
foyer-memory = { version = "0.5.2", path = "../foyer-memory" }
foyer-storage = { version = "0.8.5", path = "../foyer-storage" }
pin-project = "1"
tokio = { workspace = true }
tracing = "0.1"

Expand Down
82 changes: 77 additions & 5 deletions foyer/src/hybrid/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{borrow::Borrow, fmt::Debug, future::Future, hash::Hash, sync::Arc, time::Instant};
use std::{
borrow::Borrow,
fmt::Debug,
future::Future,
hash::Hash,
ops::Deref,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{ready, Context, Poll},
time::Instant,
};

use ahash::RandomState;
use foyer_common::{
Expand All @@ -21,6 +34,7 @@ use foyer_common::{
};
use foyer_memory::{Cache, CacheContext, CacheEntry, Fetch, FetchState};
use foyer_storage::{DeviceStats, Storage, Store};
use pin_project::pin_project;
use tokio::sync::oneshot;

use crate::HybridCacheWriter;
Expand Down Expand Up @@ -273,7 +287,57 @@ impl From<oneshot::error::RecvError> for ObtainFetchError {
}

/// The future generated by [`HybridCache::fetch`].
pub type HybridFetch<K, V, S = RandomState> = Fetch<K, V, anyhow::Error, S>;
pub type HybridFetch<K, V, S = RandomState> = HybridFetchInner<K, V, S>;

/// A future that is used to get entry value from the remote storage for the hybrid cache.
#[pin_project]
pub struct HybridFetchInner<K, V, S = RandomState>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
#[pin]
inner: Fetch<K, V, anyhow::Error, S>,

enqueue: Arc<AtomicBool>,
storage: Store<K, V, S>,
}

impl<K, V, S> Future for HybridFetchInner<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
type Output = anyhow::Result<CacheEntry<K, V, S>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = ready!(this.inner.poll(cx));

if let Ok(entry) = res.as_ref() {
if this.enqueue.load(Ordering::Relaxed) {
this.storage.enqueue(entry.clone(), false);
}
}

Poll::Ready(res)
}
}

impl<K, V, S> Deref for HybridFetchInner<K, V, S>
where
K: StorageKey,
V: StorageValue,
S: HashBuilder + Debug,
{
type Target = Fetch<K, V, anyhow::Error, S>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<K, V, S> HybridCache<K, V, S>
where
Expand All @@ -293,11 +357,13 @@ where
let now = Instant::now();

let store = self.storage.clone();
let enqueue = Arc::<AtomicBool>::default();
let future = fetch();
let ret = self.memory.fetch_with_runtime(
let inner = self.memory.fetch_with_runtime(
key.clone(),
|| {
let metrics = self.metrics.clone();
let enqueue = enqueue.clone();
async move {
match store.load(&key).await.map_err(anyhow::Error::from)? {
None => {}
Expand All @@ -313,18 +379,24 @@ where
metrics.hybrid_miss.increment(1);
metrics.hybrid_miss_duration.record(now.elapsed());

enqueue.store(true, Ordering::Relaxed);

future.await.map_err(anyhow::Error::from)
}
},
self.storage().runtime(),
);

if ret.state() == FetchState::Hit {
if inner.state() == FetchState::Hit {
self.metrics.hybrid_hit.increment(1);
self.metrics.hybrid_hit_duration.record(now.elapsed());
}

ret
HybridFetchInner {
inner,
enqueue,
storage: self.storage.clone(),
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion foyer/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub use storage::{

pub use crate::hybrid::{
builder::{HybridCacheBuilder, HybridCacheBuilderPhaseMemory, HybridCacheBuilderPhaseStorage},
cache::{HybridCache, HybridCacheEntry, HybridFetch},
cache::{HybridCache, HybridCacheEntry, HybridFetch, HybridFetchInner},
writer::{HybridCacheStorageWriter, HybridCacheWriter},
};

Expand Down

0 comments on commit 1c20d09

Please sign in to comment.