From 56f2311ce23e6d5cc4265155412191a7d7dcb429 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Mon, 24 Jul 2023 15:55:11 +0800 Subject: [PATCH] fix: dirty last checkpoint metadata file when enable object store caching, #2013 --- src/object-store/src/layers/lru_cache.rs | 30 +++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index 34ba6557e7ea..b22805890655 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use async_trait::async_trait; use lru::LruCache; use metrics::increment_counter; -use opendal::raw::oio::{Page, Read, Reader, Write}; +use opendal::raw::oio::{Page, Read, ReadExt, Reader, Write}; use opendal::raw::{ Accessor, Layer, LayeredAccessor, OpAppend, OpDelete, OpList, OpRead, OpWrite, RpAppend, RpDelete, RpList, RpRead, RpWrite, @@ -89,6 +89,12 @@ pub struct LruCacheAccessor { lru_cache: Arc>>, } +/// Returns true when the path of the file can be cached. +fn can_cache(path: &str) -> bool { + // TODO(dennis): find a better way + !path.ends_with("_last_checkpoint") +} + impl LruCacheAccessor { fn cache_path(&self, path: &str, args: &OpRead) -> String { format!( @@ -99,8 +105,6 @@ impl LruCacheAccessor { } } -use opendal::raw::oio::ReadExt; - #[async_trait] impl LayeredAccessor for LruCacheAccessor { type Inner = I; @@ -117,6 +121,10 @@ impl LayeredAccessor for LruCacheAccessor { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + if !can_cache(path) { + return self.inner.read(path, args).await.map(to_output_reader); + } + let path = path.to_string(); let cache_path = self.cache_path(&path, &args); let lru_cache = &self.lru_cache; @@ -218,3 +226,19 @@ impl LayeredAccessor for LruCacheAccessor { fn to_output_reader(input: (RpRead, R)) -> (RpRead, Reader) { (input.0, Box::new(input.1)) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_can_cache() { + assert!(can_cache("test")); + assert!(can_cache("a/b/c.parquet")); + assert!(can_cache("1.json")); + assert!(can_cache("100.checkpoint")); + assert!(can_cache("test/last_checkpoint")); + assert!(!can_cache("test/__last_checkpoint")); + assert!(!can_cache("a/b/c/__last_checkpoint")); + } +}