From 65e9de32701ba5107b1c938daadc0a2431cca97e Mon Sep 17 00:00:00 2001 From: Xiliang Chen Date: Mon, 2 Oct 2023 16:41:40 +1300 Subject: [PATCH] Fix race condition (#103) * fix a panic * improve tests --- src/extensions/client/mod.rs | 2 +- src/utils/cache.rs | 117 ++++++++++++++++++++++++++++------- 2 files changed, 96 insertions(+), 23 deletions(-) diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index 2880f17..1692595 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -141,7 +141,7 @@ impl Client { Err((e, url)) => { tracing::warn!("Unable to connect to endpoint: '{url}' error: {e}"); // TODO: use a backoff strategy - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; } } } diff --git a/src/utils/cache.rs b/src/utils/cache.rs index fa8aba0..3e16886 100644 --- a/src/utils/cache.rs +++ b/src/utils/cache.rs @@ -102,38 +102,52 @@ impl Cache { where F: FnOnce() -> BoxFuture<'static, Result>, { + let fetch = || async { + let (tx, rx) = watch::channel(None); + self.cache + .insert(key.clone(), CacheValue::Pending(rx)) + .await; + let value = f().await; + let _ = tx.send(Some(value.clone())); + match &value { + Ok(value) => { + self.cache + .insert(key.clone(), CacheValue::Value(value.clone())) + .await; + } + Err(_) => { + self.cache.remove(&key).await; + } + }; + value + }; + match self.cache.get(&key) { Some(CacheValue::Value(value)) => Ok(value), Some(CacheValue::Pending(mut rx)) => { { + // limit the scope of value let value = rx.borrow(); if value.is_some() { return value.clone().unwrap(); } } + let _ = rx.changed().await; - let value = rx.borrow(); - value.clone().expect("Cache: should always be Some") - } - None => { - let (tx, rx) = watch::channel(None); - self.cache - .insert(key.clone(), CacheValue::Pending(rx)) - .await; - let value = f().await; - let _ = tx.send(Some(value.clone())); - match &value { - Ok(value) => { - self.cache - .insert(key.clone(), CacheValue::Value(value.clone())) - .await; - } - Err(_) => { - self.cache.remove(&key).await; + + { + // limit the scope of value + let value = rx.borrow(); + if let Some(value) = &*value { + return value.clone(); } - }; - value + } + + // this only happens when initial fetch request got canceled for some reason + // in that case we need to fetch again + fetch().await } + None => fetch().await, } } @@ -152,6 +166,7 @@ impl Cache { mod tests { use super::*; use futures::FutureExt as _; + use jsonrpsee::types::error::reject_too_big_request; use serde_json::json; #[tokio::test] @@ -199,8 +214,6 @@ mod tests { let cache2 = cache.clone(); let key2 = key.clone(); let h2 = tokio::spawn(async move { - println!("5"); - let value = cache2 .get_or_insert_with(key2, || { async { @@ -221,4 +234,64 @@ mod tests { assert_eq!(cache.get(&key).await, Some(json!("value"))); } + + #[tokio::test] + async fn get_or_insert_with_handle_canceled_request() { + let cache = Cache::::new(NonZeroUsize::new(1).unwrap(), None); + + let key = CacheKey::::new(&"key".to_string(), &[]); + + let (_tx, rx) = tokio::sync::oneshot::channel::<()>(); + + let cache2 = cache.clone(); + let key2 = key.clone(); + let h1 = tokio::spawn(async move { + let _ = cache2 + .get_or_insert_with(key2.clone(), || { + async move { + let _ = rx.await; + panic!(); + } + .boxed() + }) + .await; + unreachable!(); + }); + + tokio::task::yield_now().await; + + let cache2 = cache.clone(); + let key2 = key.clone(); + let h2 = tokio::spawn(async move { + let value = cache2 + .get_or_insert_with(key2, || async { Ok(json!("value")) }.boxed()) + .await; + assert_eq!(value, Ok(json!("value"))); + }); + + tokio::task::yield_now().await; + + h1.abort(); // first request failed for whatever reason + + h1.await.unwrap_err(); + h2.await.unwrap(); // second request should still work + + assert_eq!(cache.get(&key).await, Some(json!("value"))); + } + + #[tokio::test] + async fn get_or_insert_with_error() { + let cache = Cache::::new(NonZeroUsize::new(1).unwrap(), None); + + let key = CacheKey::::new(&"key".to_string(), &[]); + + let value = cache + .get_or_insert_with(key.clone(), || { + async move { Err(reject_too_big_request(100)) }.boxed() + }) + .await; + assert_eq!(value, Err(reject_too_big_request(100))); + + assert_eq!(cache.get(&key).await, None); + } }