Skip to content

Commit

Permalink
refactor: remove atomic op in #591 (#594)
Browse files Browse the repository at this point in the history
* refactor: remove atomic op in #591

Signed-off-by: MrCroxx <[email protected]>

* refactor: use deversion future to simplify? code

Signed-off-by: MrCroxx <[email protected]>

* chore: remove unused

Signed-off-by: MrCroxx <[email protected]>

* chore: pass typo

Signed-off-by: MrCroxx <[email protected]>

* chore: tiny refactor

Signed-off-by: MrCroxx <[email protected]>

* refactor: remove default bound to refine

Signed-off-by: MrCroxx <[email protected]>

* refactor: refine code

Signed-off-by: MrCroxx <[email protected]>

* chore: refine code

Signed-off-by: MrCroxx <[email protected]>

---------

Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Jul 2, 2024
1 parent ef3f139 commit c036ed7
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 70 deletions.
117 changes: 117 additions & 0 deletions foyer-common/src/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2024 Foyer Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pin_project::pin_project;
use std::{
future::Future,
marker::PhantomData,
ops::Deref,
pin::Pin,
task::{ready, Context, Poll},
};

/// Result that the inner future of a [`DiversionFuture`] should return.
///
/// - The `target` will be further returned by [`DiversionFuture`].
/// - The `store` will be stored in the [`DiversionFuture`].
pub struct Diversion<T, S> {
/// The `target` will be further returned by [`DiversionFuture`].
pub target: T,
/// The `store` will be stored in the [`DiversionFuture`].
pub store: Option<S>,
}

impl<T, S> From<T> for Diversion<T, S> {
fn from(value: T) -> Self {
Self {
target: value,
store: None,
}
}
}

/// [`DiversionFuture`] is a future wrapper that partially store and partially return the future result.
#[pin_project]
pub struct DiversionFuture<FU, T, S> {
#[pin]
inner: FU,
store: Option<S>,
_marker: PhantomData<T>,
}

impl<FU, T, S> DiversionFuture<FU, T, S> {
/// Create a new [`DiversionFuture`] wrapper.
pub fn new(future: FU) -> Self {
Self {
inner: future,
store: None,
_marker: PhantomData,
}
}

/// Get the stored state.
pub fn store(&self) -> &Option<S> {
&self.store
}
}

impl<FU, T, S> Deref for DiversionFuture<FU, T, S> {
type Target = FU;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<FU, T, S, I> Future for DiversionFuture<FU, T, S>
where
FU: Future<Output = I>,
I: Into<Diversion<T, S>>,
{
type Output = T;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let Diversion { target, store } = ready!(this.inner.poll(cx)).into();
*this.store = store;
Poll::Ready(target)
}
}

#[cfg(test)]
mod tests {
use std::pin::pin;

use futures::future::poll_fn;

use super::*;

#[tokio::test]
async fn test_diversion_future() {
let mut f = pin!(DiversionFuture::new(async move {
Diversion {
target: "The answer to life, the universe, and everything.".to_string(),
store: Some(42),
}
},));

let question: String = poll_fn(|cx| f.as_mut().poll(cx)).await;
let answer = f.store().unwrap();

assert_eq!(
(question.as_str(), answer),
("The answer to life, the universe, and everything.", 42)
);
}
}
2 changes: 2 additions & 0 deletions foyer-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub mod code;
pub mod countdown;
/// Components for monitoring internal events.
pub mod event;
/// Future extensions.
pub mod future;
/// The shared metrics for foyer.
pub mod metrics;
/// A concurrent object pool.
Expand Down
70 changes: 33 additions & 37 deletions foyer-memory/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
use std::{borrow::Borrow, fmt::Debug, hash::Hash, ops::Deref, sync::Arc};

use ahash::RandomState;
use futures::{Future, FutureExt};
use futures::Future;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;

use foyer_common::{
code::{HashBuilder, Key, Value},
event::EventListener,
future::Diversion,
};

use crate::{
Expand All @@ -33,7 +35,7 @@ use crate::{
s3fifo::{S3Fifo, S3FifoHandle},
sanity::SanityEviction,
},
generic::{GenericCache, GenericCacheConfig, GenericCacheEntry, GenericFetch, Weighter},
generic::{FetchMark, FetchState, GenericCache, GenericCacheConfig, GenericCacheEntry, GenericFetch, Weighter},
indexer::{hash_table::HashTableIndexer, sanity::SanityIndexer},
FifoConfig, LfuConfig, LruConfig, S3FifoConfig,
};
Expand Down Expand Up @@ -657,20 +659,21 @@ where
}

/// A future that is used to get entry value from the remote storage for the in-memory cache.
#[pin_project(project = FetchProj)]
pub enum Fetch<K, V, ER, S = RandomState>
where
K: Key,
V: Value,
S: HashBuilder,
{
/// A future that is used to get entry value from the remote storage for the in-memory FIFO cache.
Fifo(FifoFetch<K, V, ER, S>),
Fifo(#[pin] FifoFetch<K, V, ER, S>),
/// A future that is used to get entry value from the remote storage for the in-memory LRU cache.
Lru(LruFetch<K, V, ER, S>),
Lru(#[pin] LruFetch<K, V, ER, S>),
/// A future that is used to get entry value from the remote storage for the in-memory LFU cache.
Lfu(LfuFetch<K, V, ER, S>),
Lfu(#[pin] LfuFetch<K, V, ER, S>),
/// A future that is used to get entry value from the remote storage for the in-memory S3FIFO cache.
S3Fifo(S3FifoFetch<K, V, ER, S>),
S3Fifo(#[pin] S3FifoFetch<K, V, ER, S>),
}

impl<K, V, ER, S> From<FifoFetch<K, V, ER, S>> for Fetch<K, V, ER, S>
Expand Down Expand Up @@ -726,27 +729,16 @@ where
{
type Output = std::result::Result<CacheEntry<K, V, S>, ER>;

fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
match &mut *self {
Fetch::Fifo(entry) => entry.poll_unpin(cx).map(|res| res.map(CacheEntry::from)),
Fetch::Lru(entry) => entry.poll_unpin(cx).map(|res| res.map(CacheEntry::from)),
Fetch::Lfu(entry) => entry.poll_unpin(cx).map(|res| res.map(CacheEntry::from)),
Fetch::S3Fifo(entry) => entry.poll_unpin(cx).map(|res| res.map(CacheEntry::from)),
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
match self.project() {
FetchProj::Fifo(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
FetchProj::Lru(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
FetchProj::Lfu(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
FetchProj::S3Fifo(entry) => entry.poll(cx).map(|res| res.map(CacheEntry::from)),
}
}
}

/// The state of [`Fetch`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FetchState {
/// Cache hit.
Hit,
/// Cache miss, but wait in queue.
Wait,
/// Cache miss, and there is no other waiters at the moment.
Miss,
}

impl<K, V, ER, S> Fetch<K, V, ER, S>
where
K: Key,
Expand All @@ -756,18 +748,21 @@ where
/// Get the fetch state.
pub fn state(&self) -> FetchState {
match self {
Fetch::Fifo(FifoFetch::Hit(_))
| Fetch::Lru(LruFetch::Hit(_))
| Fetch::Lfu(LfuFetch::Hit(_))
| Fetch::S3Fifo(S3FifoFetch::Hit(_)) => FetchState::Hit,
Fetch::Fifo(FifoFetch::Wait(_))
| Fetch::Lru(LruFetch::Wait(_))
| Fetch::Lfu(LfuFetch::Wait(_))
| Fetch::S3Fifo(S3FifoFetch::Wait(_)) => FetchState::Wait,
Fetch::Fifo(FifoFetch::Miss(_))
| Fetch::Lru(LruFetch::Miss(_))
| Fetch::Lfu(LfuFetch::Miss(_))
| Fetch::S3Fifo(S3FifoFetch::Miss(_)) => FetchState::Miss,
Fetch::Fifo(fetch) => fetch.state(),
Fetch::Lru(fetch) => fetch.state(),
Fetch::Lfu(fetch) => fetch.state(),
Fetch::S3Fifo(fetch) => fetch.state(),
}
}

/// Get the ext of the fetch.
#[doc(hidden)]
pub fn store(&self) -> &Option<FetchMark> {
match self {
Fetch::Fifo(fetch) => fetch.store(),
Fetch::Lru(fetch) => fetch.store(),
Fetch::Lfu(fetch) => fetch.store(),
Fetch::S3Fifo(fetch) => fetch.store(),
}
}
}
Expand Down Expand Up @@ -826,7 +821,7 @@ where
///
/// The concurrent fetch requests will be deduplicated.
#[doc(hidden)]
pub fn fetch_inner<F, FU, ER>(
pub fn fetch_inner<F, FU, ER, ID>(
&self,
key: K,
context: CacheContext,
Expand All @@ -835,8 +830,9 @@ where
) -> Fetch<K, V, ER, S>
where
F: FnOnce() -> FU,
FU: Future<Output = std::result::Result<V, ER>> + Send + 'static,
FU: Future<Output = ID> + Send + 'static,
ER: Send + 'static + Debug,
ID: Into<Diversion<std::result::Result<V, ER>, FetchMark>>,
{
match self {
Cache::Fifo(cache) => Fetch::from(cache.fetch_inner(key, context, fetch, runtime)),
Expand Down
Loading

0 comments on commit c036ed7

Please sign in to comment.