Skip to content

Commit

Permalink
refactor(storage): adapt time travel cache to partial checkpoint (#18646
Browse files Browse the repository at this point in the history
)
  • Loading branch information
zwang28 authored Sep 24, 2024
1 parent 4401694 commit 29adeb9
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/storage/src/hummock/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use thiserror_ext::AsReport;
use tokio::sync::oneshot::error::RecvError;

// TODO(error-handling): should prefer use error types than strings.
#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Box)]
#[derive(Error, thiserror_ext::ReportDebug, thiserror_ext::Arc)]
#[thiserror_ext(newtype(name = HummockError, backtrace))]
pub enum HummockErrorInner {
#[error("Magic number mismatch: expected {expected}, found: {found}")]
Expand Down
8 changes: 4 additions & 4 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,9 @@ impl HummockStorage {
epoch: u64,
table_id: TableId,
) -> StorageResult<PinnedVersion> {
let fetch = async {
let pb_version = self
.hummock_meta_client
let meta_client = self.hummock_meta_client.clone();
let fetch = async move {
let pb_version = meta_client
.get_version_by_epoch(epoch, table_id.table_id())
.await
.inspect_err(|e| tracing::error!("{}", e.to_report_string()))
Expand All @@ -322,7 +322,7 @@ impl HummockStorage {
};
let version = self
.simple_time_travel_version_cache
.get_or_insert(epoch, fetch)
.get_or_insert(table_id.table_id, epoch, fetch)
.await?;
Ok(version)
}
Expand Down
60 changes: 26 additions & 34 deletions src/storage/src/hummock/time_travel_version_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,25 @@
// limitations under the License.

use std::future::Future;
use std::pin::Pin;

use futures::future::Shared;
use futures::FutureExt;
use moka::sync::Cache;
use risingwave_hummock_sdk::HummockEpoch;
use tokio::sync::Mutex;

use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::HummockResult;

type InflightResult = Shared<Pin<Box<dyn Future<Output = HummockResult<PinnedVersion>> + Send>>>;

/// A naive cache to reduce number of RPC sent to meta node.
pub struct SimpleTimeTravelVersionCache {
inner: Mutex<SimpleTimeTravelVersionCacheInner>,
cache: Cache<(u32, HummockEpoch), InflightResult>,
}

impl SimpleTimeTravelVersionCache {
pub fn new() -> Self {
Self {
inner: Mutex::new(SimpleTimeTravelVersionCacheInner::new()),
}
}

pub async fn get_or_insert(
&self,
epoch: HummockEpoch,
fetch: impl Future<Output = HummockResult<PinnedVersion>>,
) -> HummockResult<PinnedVersion> {
let mut guard = self.inner.lock().await;
if let Some(v) = guard.get(&epoch) {
return Ok(v);
}
let version = fetch.await?;
guard.add(epoch, version);
Ok(guard.get(&epoch).unwrap())
}
}

struct SimpleTimeTravelVersionCacheInner {
cache: Cache<HummockEpoch, PinnedVersion>,
}

impl SimpleTimeTravelVersionCacheInner {
fn new() -> Self {
let capacity = std::env::var("RW_HUMMOCK_TIME_TRAVEL_CACHE_SIZE")
.unwrap_or_else(|_| "10".into())
.parse()
Expand All @@ -62,11 +40,25 @@ impl SimpleTimeTravelVersionCacheInner {
Self { cache }
}

fn get(&self, epoch: &HummockEpoch) -> Option<PinnedVersion> {
self.cache.get(epoch)
}

fn add(&mut self, epoch: HummockEpoch, version: PinnedVersion) {
self.cache.insert(epoch, version);
pub async fn get_or_insert(
&self,
table_id: u32,
epoch: HummockEpoch,
fetch: impl Future<Output = HummockResult<PinnedVersion>> + Send + 'static,
) -> HummockResult<PinnedVersion> {
self.cache
.entry((table_id, epoch))
.or_insert_with_if(
|| fetch.boxed().shared(),
|inflight| {
if let Some(result) = inflight.peek() {
return result.is_err();
}
false
},
)
.value()
.clone()
.await
}
}

0 comments on commit 29adeb9

Please sign in to comment.