Skip to content

Commit

Permalink
Fix race condition (#103)
Browse files Browse the repository at this point in the history
* fix a panic

* improve tests
  • Loading branch information
xlc authored Oct 2, 2023
1 parent 93f1d18 commit 65e9de3
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl Client {
Err((e, url)) => {
tracing::warn!("Unable to connect to endpoint: '{url}' error: {e}");
// TODO: use a backoff strategy
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}
}
}
Expand Down
117 changes: 95 additions & 22 deletions src/utils/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,38 +102,52 @@ impl<D: Digest + 'static> Cache<D> {
where
F: FnOnce() -> BoxFuture<'static, Result<JsonValue, ErrorObjectOwned>>,
{
let fetch = || async {
let (tx, rx) = watch::channel(None);
self.cache
.insert(key.clone(), CacheValue::Pending(rx))
.await;
let value = f().await;
let _ = tx.send(Some(value.clone()));
match &value {
Ok(value) => {
self.cache
.insert(key.clone(), CacheValue::Value(value.clone()))
.await;
}
Err(_) => {
self.cache.remove(&key).await;
}
};
value
};

match self.cache.get(&key) {
Some(CacheValue::Value(value)) => Ok(value),
Some(CacheValue::Pending(mut rx)) => {
{
// limit the scope of value
let value = rx.borrow();
if value.is_some() {
return value.clone().unwrap();
}
}

let _ = rx.changed().await;
let value = rx.borrow();
value.clone().expect("Cache: should always be Some")
}
None => {
let (tx, rx) = watch::channel(None);
self.cache
.insert(key.clone(), CacheValue::Pending(rx))
.await;
let value = f().await;
let _ = tx.send(Some(value.clone()));
match &value {
Ok(value) => {
self.cache
.insert(key.clone(), CacheValue::Value(value.clone()))
.await;
}
Err(_) => {
self.cache.remove(&key).await;

{
// limit the scope of value
let value = rx.borrow();
if let Some(value) = &*value {
return value.clone();
}
};
value
}

// this only happens when initial fetch request got canceled for some reason
// in that case we need to fetch again
fetch().await
}
None => fetch().await,
}
}

Expand All @@ -152,6 +166,7 @@ impl<D: Digest + 'static> Cache<D> {
mod tests {
use super::*;
use futures::FutureExt as _;
use jsonrpsee::types::error::reject_too_big_request;
use serde_json::json;

#[tokio::test]
Expand Down Expand Up @@ -199,8 +214,6 @@ mod tests {
let cache2 = cache.clone();
let key2 = key.clone();
let h2 = tokio::spawn(async move {
println!("5");

let value = cache2
.get_or_insert_with(key2, || {
async {
Expand All @@ -221,4 +234,64 @@ mod tests {

assert_eq!(cache.get(&key).await, Some(json!("value")));
}

#[tokio::test]
async fn get_or_insert_with_handle_canceled_request() {
let cache = Cache::<blake2::Blake2b512>::new(NonZeroUsize::new(1).unwrap(), None);

let key = CacheKey::<blake2::Blake2b512>::new(&"key".to_string(), &[]);

let (_tx, rx) = tokio::sync::oneshot::channel::<()>();

let cache2 = cache.clone();
let key2 = key.clone();
let h1 = tokio::spawn(async move {
let _ = cache2
.get_or_insert_with(key2.clone(), || {
async move {
let _ = rx.await;
panic!();
}
.boxed()
})
.await;
unreachable!();
});

tokio::task::yield_now().await;

let cache2 = cache.clone();
let key2 = key.clone();
let h2 = tokio::spawn(async move {
let value = cache2
.get_or_insert_with(key2, || async { Ok(json!("value")) }.boxed())
.await;
assert_eq!(value, Ok(json!("value")));
});

tokio::task::yield_now().await;

h1.abort(); // first request failed for whatever reason

h1.await.unwrap_err();
h2.await.unwrap(); // second request should still work

assert_eq!(cache.get(&key).await, Some(json!("value")));
}

#[tokio::test]
async fn get_or_insert_with_error() {
let cache = Cache::<blake2::Blake2b512>::new(NonZeroUsize::new(1).unwrap(), None);

let key = CacheKey::<blake2::Blake2b512>::new(&"key".to_string(), &[]);

let value = cache
.get_or_insert_with(key.clone(), || {
async move { Err(reject_too_big_request(100)) }.boxed()
})
.await;
assert_eq!(value, Err(reject_too_big_request(100)));

assert_eq!(cache.get(&key).await, None);
}
}

0 comments on commit 65e9de3

Please sign in to comment.