From a2dae5dbf680ac819819c705a070f1d8edc6e3cb Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Mon, 23 Sep 2024 22:40:50 +0800 Subject: [PATCH] refactor lock --- src/storage/src/hummock/error.rs | 2 +- .../src/hummock/store/hummock_storage.rs | 6 +-- .../src/hummock/time_travel_version_cache.rs | 47 ++++++++++++++----- 3 files changed, 40 insertions(+), 15 deletions(-) diff --git a/src/storage/src/hummock/error.rs b/src/storage/src/hummock/error.rs index 48f71b9199332..df843b3ac4e3d 100644 --- a/src/storage/src/hummock/error.rs +++ b/src/storage/src/hummock/error.rs @@ -19,7 +19,7 @@ use thiserror_ext::AsReport; use tokio::sync::oneshot::error::RecvError; // TODO(error-handling): should prefer use error types than strings. -#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Box)] +#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Arc)] #[thiserror_ext(newtype(name = HummockError, backtrace))] pub enum HummockErrorInner { #[error("Magic number mismatch: expected {expected}, found: {found}")] diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 337e2648ab879..b832a8e865b68 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -309,9 +309,9 @@ impl HummockStorage { epoch: u64, table_id: TableId, ) -> StorageResult { - let fetch = async { - let pb_version = self - .hummock_meta_client + let meta_client = self.hummock_meta_client.clone(); + let fetch = async move { + let pb_version = meta_client .get_version_by_epoch(epoch, table_id.table_id()) .await .inspect_err(|e| tracing::error!("{}", e.to_report_string())) diff --git a/src/storage/src/hummock/time_travel_version_cache.rs b/src/storage/src/hummock/time_travel_version_cache.rs index 8a50721e09d52..0867aabf830a8 100644 --- a/src/storage/src/hummock/time_travel_version_cache.rs +++ b/src/storage/src/hummock/time_travel_version_cache.rs @@ -13,23 +13,31 @@ // limitations under the License. use std::future::Future; +use std::pin::Pin; +use ahash::HashMap; +use futures::future::Shared; +use futures::FutureExt; use moka::sync::Cache; +use parking_lot::{Mutex, RwLock}; use risingwave_hummock_sdk::HummockEpoch; -use tokio::sync::Mutex; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::HummockResult; +type InflightRequest = Shared> + Send>>>; + /// A naive cache to reduce number of RPC sent to meta node. pub struct SimpleTimeTravelVersionCache { - inner: Mutex, + inner: RwLock, + request_registry: Mutex>, } impl SimpleTimeTravelVersionCache { pub fn new() -> Self { Self { - inner: Mutex::new(SimpleTimeTravelVersionCacheInner::new()), + inner: RwLock::new(SimpleTimeTravelVersionCacheInner::new()), + request_registry: Default::default(), } } @@ -37,15 +45,28 @@ impl SimpleTimeTravelVersionCache { &self, table_id: u32, epoch: HummockEpoch, - fetch: impl Future>, + fetch: impl Future> + Send + 'static, ) -> HummockResult { - let mut guard = self.inner.lock().await; - if let Some(v) = guard.get(table_id, epoch) { + // happy path: from cache + if let Some(v) = self.inner.read().get(table_id, epoch) { return Ok(v); } - let version = fetch.await?; - guard.add(table_id, epoch, version); - Ok(guard.get(table_id, epoch).unwrap()) + // slow path: from RPC + let fut = { + let mut requests = self.request_registry.lock(); + let inflight = requests.get(&(table_id, epoch)).cloned(); + inflight.unwrap_or_else(|| { + let request = fetch.boxed().shared(); + requests.insert((table_id, epoch), request.clone()); + request + }) + }; + let result = fut.await; + if let Ok(ref v) = result { + self.inner.write().try_insert(table_id, epoch, v); + } + self.request_registry.lock().remove(&(table_id, epoch)); + result } } @@ -63,11 +84,15 @@ impl SimpleTimeTravelVersionCacheInner { Self { cache } } + /// Tries to get the value fn get(&self, table_id: u32, epoch: HummockEpoch) -> Option { self.cache.get(&(table_id, epoch)) } - fn add(&mut self, table_id: u32, epoch: HummockEpoch, version: PinnedVersion) { - self.cache.insert((table_id, epoch), version); + /// Inserts entry if key is not present. + fn try_insert(&mut self, table_id: u32, epoch: HummockEpoch, version: &PinnedVersion) { + if !self.cache.contains_key(&(table_id, epoch)) { + self.cache.insert((table_id, epoch), version.clone()) + } } }