Skip to content

Commit

Permalink
refactor lock
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Sep 23, 2024
1 parent 02165ea commit a2dae5d
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/storage/src/hummock/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use thiserror_ext::AsReport;
use tokio::sync::oneshot::error::RecvError;

// TODO(error-handling): should prefer use error types than strings.
#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Box)]
#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Arc)]
#[thiserror_ext(newtype(name = HummockError, backtrace))]
pub enum HummockErrorInner {
#[error("Magic number mismatch: expected {expected}, found: {found}")]
Expand Down
6 changes: 3 additions & 3 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,9 @@ impl HummockStorage {
epoch: u64,
table_id: TableId,
) -> StorageResult<PinnedVersion> {
let fetch = async {
let pb_version = self
.hummock_meta_client
let meta_client = self.hummock_meta_client.clone();
let fetch = async move {
let pb_version = meta_client
.get_version_by_epoch(epoch, table_id.table_id())
.await
.inspect_err(|e| tracing::error!("{}", e.to_report_string()))
Expand Down
47 changes: 36 additions & 11 deletions src/storage/src/hummock/time_travel_version_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,60 @@
// limitations under the License.

use std::future::Future;
use std::pin::Pin;

use ahash::HashMap;
use futures::future::Shared;
use futures::FutureExt;
use moka::sync::Cache;
use parking_lot::{Mutex, RwLock};
use risingwave_hummock_sdk::HummockEpoch;
use tokio::sync::Mutex;

use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::HummockResult;

type InflightRequest = Shared<Pin<Box<dyn Future<Output = HummockResult<PinnedVersion>> + Send>>>;

/// A naive cache to reduce number of RPC sent to meta node.
pub struct SimpleTimeTravelVersionCache {
inner: Mutex<SimpleTimeTravelVersionCacheInner>,
inner: RwLock<SimpleTimeTravelVersionCacheInner>,
request_registry: Mutex<HashMap<(u32, HummockEpoch), InflightRequest>>,
}

impl SimpleTimeTravelVersionCache {
pub fn new() -> Self {
Self {
inner: Mutex::new(SimpleTimeTravelVersionCacheInner::new()),
inner: RwLock::new(SimpleTimeTravelVersionCacheInner::new()),
request_registry: Default::default(),
}
}

pub async fn get_or_insert(
&self,
table_id: u32,
epoch: HummockEpoch,
fetch: impl Future<Output = HummockResult<PinnedVersion>>,
fetch: impl Future<Output = HummockResult<PinnedVersion>> + Send + 'static,
) -> HummockResult<PinnedVersion> {
let mut guard = self.inner.lock().await;
if let Some(v) = guard.get(table_id, epoch) {
// happy path: from cache
if let Some(v) = self.inner.read().get(table_id, epoch) {
return Ok(v);
}
let version = fetch.await?;
guard.add(table_id, epoch, version);
Ok(guard.get(table_id, epoch).unwrap())
// slow path: from RPC
let fut = {
let mut requests = self.request_registry.lock();
let inflight = requests.get(&(table_id, epoch)).cloned();
inflight.unwrap_or_else(|| {
let request = fetch.boxed().shared();
requests.insert((table_id, epoch), request.clone());
request
})
};
let result = fut.await;
if let Ok(ref v) = result {
self.inner.write().try_insert(table_id, epoch, v);
}
self.request_registry.lock().remove(&(table_id, epoch));
result
}
}

Expand All @@ -63,11 +84,15 @@ impl SimpleTimeTravelVersionCacheInner {
Self { cache }
}

/// Tries to get the value
fn get(&self, table_id: u32, epoch: HummockEpoch) -> Option<PinnedVersion> {
self.cache.get(&(table_id, epoch))
}

fn add(&mut self, table_id: u32, epoch: HummockEpoch, version: PinnedVersion) {
self.cache.insert((table_id, epoch), version);
/// Inserts entry if key is not present.
fn try_insert(&mut self, table_id: u32, epoch: HummockEpoch, version: &PinnedVersion) {
if !self.cache.contains_key(&(table_id, epoch)) {
self.cache.insert((table_id, epoch), version.clone())
}
}
}

0 comments on commit a2dae5d

Please sign in to comment.