Skip to content

Commit

Permalink
fix: dirty last checkpoint metadata file when enable object store cac…
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Jul 24, 2023
1 parent 632cb26 commit 56f2311
Showing 1 changed file with 27 additions and 3 deletions.
30 changes: 27 additions & 3 deletions src/object-store/src/layers/lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use lru::LruCache;
use metrics::increment_counter;
use opendal::raw::oio::{Page, Read, Reader, Write};
use opendal::raw::oio::{Page, Read, ReadExt, Reader, Write};
use opendal::raw::{
Accessor, Layer, LayeredAccessor, OpAppend, OpDelete, OpList, OpRead, OpWrite, RpAppend,
RpDelete, RpList, RpRead, RpWrite,
Expand Down Expand Up @@ -89,6 +89,12 @@ pub struct LruCacheAccessor<I, C> {
lru_cache: Arc<Mutex<LruCache<String, ()>>>,
}

/// Returns true when the path of the file can be cached.
fn can_cache(path: &str) -> bool {
// TODO(dennis): find a better way
!path.ends_with("_last_checkpoint")
}

impl<I, C> LruCacheAccessor<I, C> {
fn cache_path(&self, path: &str, args: &OpRead) -> String {
format!(
Expand All @@ -99,8 +105,6 @@ impl<I, C> LruCacheAccessor<I, C> {
}
}

use opendal::raw::oio::ReadExt;

#[async_trait]
impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
type Inner = I;
Expand All @@ -117,6 +121,10 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
if !can_cache(path) {
return self.inner.read(path, args).await.map(to_output_reader);
}

let path = path.to_string();
let cache_path = self.cache_path(&path, &args);
let lru_cache = &self.lru_cache;
Expand Down Expand Up @@ -218,3 +226,19 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
(input.0, Box::new(input.1))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_can_cache() {
assert!(can_cache("test"));
assert!(can_cache("a/b/c.parquet"));
assert!(can_cache("1.json"));
assert!(can_cache("100.checkpoint"));
assert!(can_cache("test/last_checkpoint"));
assert!(!can_cache("test/__last_checkpoint"));
assert!(!can_cache("a/b/c/__last_checkpoint"));
}
}

0 comments on commit 56f2311

Please sign in to comment.