diff --git a/src/storage/src/hummock/error.rs b/src/storage/src/hummock/error.rs index 48f71b9199332..df843b3ac4e3d 100644 --- a/src/storage/src/hummock/error.rs +++ b/src/storage/src/hummock/error.rs @@ -19,7 +19,7 @@ use thiserror_ext::AsReport; use tokio::sync::oneshot::error::RecvError; // TODO(error-handling): should prefer use error types than strings. -#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Box)] +#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Arc)] #[thiserror_ext(newtype(name = HummockError, backtrace))] pub enum HummockErrorInner { #[error("Magic number mismatch: expected {expected}, found: {found}")] diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 3e88851feb456..b832a8e865b68 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -309,9 +309,9 @@ impl HummockStorage { epoch: u64, table_id: TableId, ) -> StorageResult { - let fetch = async { - let pb_version = self - .hummock_meta_client + let meta_client = self.hummock_meta_client.clone(); + let fetch = async move { + let pb_version = meta_client .get_version_by_epoch(epoch, table_id.table_id()) .await .inspect_err(|e| tracing::error!("{}", e.to_report_string())) @@ -322,7 +322,7 @@ impl HummockStorage { }; let version = self .simple_time_travel_version_cache - .get_or_insert(epoch, fetch) + .get_or_insert(table_id.table_id, epoch, fetch) .await?; Ok(version) } diff --git a/src/storage/src/hummock/time_travel_version_cache.rs b/src/storage/src/hummock/time_travel_version_cache.rs index 08ad70ab44fa4..8ee77c7386d27 100644 --- a/src/storage/src/hummock/time_travel_version_cache.rs +++ b/src/storage/src/hummock/time_travel_version_cache.rs @@ -13,47 +13,25 @@ // limitations under the License. use std::future::Future; +use std::pin::Pin; +use futures::future::Shared; +use futures::FutureExt; use moka::sync::Cache; use risingwave_hummock_sdk::HummockEpoch; -use tokio::sync::Mutex; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::HummockResult; +type InflightResult = Shared> + Send>>>; + /// A naive cache to reduce number of RPC sent to meta node. pub struct SimpleTimeTravelVersionCache { - inner: Mutex, + cache: Cache<(u32, HummockEpoch), InflightResult>, } impl SimpleTimeTravelVersionCache { pub fn new() -> Self { - Self { - inner: Mutex::new(SimpleTimeTravelVersionCacheInner::new()), - } - } - - pub async fn get_or_insert( - &self, - epoch: HummockEpoch, - fetch: impl Future>, - ) -> HummockResult { - let mut guard = self.inner.lock().await; - if let Some(v) = guard.get(&epoch) { - return Ok(v); - } - let version = fetch.await?; - guard.add(epoch, version); - Ok(guard.get(&epoch).unwrap()) - } -} - -struct SimpleTimeTravelVersionCacheInner { - cache: Cache, -} - -impl SimpleTimeTravelVersionCacheInner { - fn new() -> Self { let capacity = std::env::var("RW_HUMMOCK_TIME_TRAVEL_CACHE_SIZE") .unwrap_or_else(|_| "10".into()) .parse() @@ -62,11 +40,25 @@ impl SimpleTimeTravelVersionCacheInner { Self { cache } } - fn get(&self, epoch: &HummockEpoch) -> Option { - self.cache.get(epoch) - } - - fn add(&mut self, epoch: HummockEpoch, version: PinnedVersion) { - self.cache.insert(epoch, version); + pub async fn get_or_insert( + &self, + table_id: u32, + epoch: HummockEpoch, + fetch: impl Future> + Send + 'static, + ) -> HummockResult { + self.cache + .entry((table_id, epoch)) + .or_insert_with_if( + || fetch.boxed().shared(), + |inflight| { + if let Some(result) = inflight.peek() { + return result.is_err(); + } + false + }, + ) + .value() + .clone() + .await } }