From 9176b1eec3c1dc6a83449e4f5428750a5a64c528 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 27 Dec 2023 22:01:43 +0800 Subject: [PATCH] test: tests for file cache --- src/mito2/src/cache/file_cache.rs | 187 +++++++++++++++++++++++++++++- 1 file changed, 184 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 238790ae3d52..a501654f1694 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -22,13 +22,13 @@ use futures::{FutureExt, TryStreamExt}; use moka::future::Cache; use moka::notification::RemovalCause; use object_store::util::{join_dir, join_path}; -use object_store::{Metakey, ObjectStore}; +use object_store::{ErrorKind, Metakey, ObjectStore, Reader}; use snafu::ResultExt; use store_api::storage::RegionId; use crate::cache::FILE_TYPE; use crate::error::{OpenDalSnafu, Result}; -use crate::metrics::CACHE_BYTES; +use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS}; use crate::sst::file::FileId; /// Subdirectory of cached files. @@ -59,10 +59,12 @@ impl FileCache { let file_dir = cache_file_dir(&cache_home); let cache_store = local_store.clone(); let cache_file_dir = file_dir.clone(); - let memory_index = Cache::builder().max_capacity(capacity.as_bytes()).weigher(|_key, value: &IndexValue| -> u32 { + let memory_index = Cache::builder() + .weigher(|_key, value: &IndexValue| -> u32 { // We only measure space on local store. value.file_size }) + .max_capacity(capacity.as_bytes()) .async_eviction_listener(move |key, value, cause| { let store = cache_store.clone(); let file_path = cache_file_path(&cache_file_dir, *key); @@ -104,6 +106,39 @@ impl FileCache { self.memory_index.insert(key, value).await; } + /// Reads a file from the cache. + pub(crate) async fn reader(&self, key: IndexKey) -> Option { + if !self.memory_index.contains_key(&key) { + CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + return None; + } + + let file_path = cache_file_path(&self.file_dir, key); + match self.local_store.reader(&file_path).await { + Ok(reader) => { + CACHE_HIT.with_label_values(&[FILE_TYPE]).inc(); + Some(reader) + } + Err(e) => { + if e.kind() != ErrorKind::NotFound { + warn!("Failed to get file for key {:?}, err: {}", key, e); + } + // We removes the file from the index. + self.memory_index.remove(&key).await; + CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + None + } + } + } + + /// Removes a file from the cache explicitly. + pub(crate) async fn remove(&self, key: IndexKey) { + let file_path = cache_file_path(&self.file_dir, key); + if let Err(e) = self.local_store.delete(&file_path).await { + warn!(e; "Failed to delete a cached file {}", file_path); + } + } + /// Recovers the index from local store. pub(crate) async fn recover(&self) -> Result<()> { let now = Instant::now(); @@ -181,3 +216,149 @@ fn parse_index_key(name: &str) -> Option { Some((region_id, file_id)) } + +#[cfg(test)] +mod tests { + use common_test_util::temp_dir::create_temp_dir; + use futures::AsyncReadExt; + use object_store::services::Fs; + + use super::*; + + fn new_fs_store(path: &str) -> ObjectStore { + let mut builder = Fs::default(); + builder.root(path); + ObjectStore::new(builder).unwrap().finish() + } + + #[tokio::test] + async fn test_file_cache_basic() { + let dir = create_temp_dir(""); + let local_store = new_fs_store(dir.path().to_str().unwrap()); + let cache_home = "cache".to_string(); + + let cache = FileCache::new( + local_store.clone(), + cache_home.clone(), + ReadableSize::mb(10), + ); + let region_id = RegionId::new(2000, 0); + let file_id = FileId::random(); + let key = (region_id, file_id); + let file_path = cache_file_path(&cache.file_dir, key); + + // Get an empty file. + assert!(cache.reader(key).await.is_none()); + + // Write a file. + local_store + .write(&file_path, b"hello".as_slice()) + .await + .unwrap(); + // Add to the cache. + cache + .put((region_id, file_id), IndexValue { file_size: 5 }) + .await; + + // Read file content. + let mut reader = cache.reader(key).await.unwrap(); + let mut buf = String::new(); + reader.read_to_string(&mut buf).await.unwrap(); + assert_eq!("hello", buf); + + // Remove the file. + cache.remove(key).await; + assert!(cache.reader(key).await.is_none()); + + // Ensure all pending tasks of the moka cache is done before assertion. + cache.memory_index.run_pending_tasks().await; + + // The file also not exists. + assert!(!local_store.is_exist(&file_path).await.unwrap()); + } + + #[tokio::test] + async fn test_file_cache_file_removed() { + let dir = create_temp_dir(""); + let local_store = new_fs_store(dir.path().to_str().unwrap()); + let cache_home = "cache".to_string(); + + let cache = FileCache::new( + local_store.clone(), + cache_home.clone(), + ReadableSize::mb(10), + ); + let region_id = RegionId::new(2000, 0); + let file_id = FileId::random(); + let key = (region_id, file_id); + let file_path = cache_file_path(&cache.file_dir, key); + + // Write a file. + local_store + .write(&file_path, b"hello".as_slice()) + .await + .unwrap(); + // Add to the cache. + cache + .put((region_id, file_id), IndexValue { file_size: 5 }) + .await; + + // Remove the file but keep the index. + local_store.delete(&file_path).await.unwrap(); + + // Reader is none. + assert!(cache.reader(key).await.is_none()); + // Key is removed. + assert!(!cache.memory_index.contains_key(&key)); + } + + #[tokio::test] + async fn test_file_cache_recover() { + let dir = create_temp_dir(""); + let local_store = new_fs_store(dir.path().to_str().unwrap()); + let cache_home = "cache".to_string(); + let cache = FileCache::new( + local_store.clone(), + cache_home.clone(), + ReadableSize::mb(10), + ); + + let region_id = RegionId::new(2000, 0); + // Write N files. + let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect(); + for (i, file_id) in file_ids.iter().enumerate() { + let key = (region_id, *file_id); + let file_path = cache_file_path(&cache.file_dir, key); + let bytes = i.to_string().into_bytes(); + local_store.write(&file_path, bytes.clone()).await.unwrap(); + + // Add to the cache. + cache + .put( + (region_id, *file_id), + IndexValue { + file_size: bytes.len() as u32, + }, + ) + .await; + } + + // Recover the cache. + let cache = FileCache::new( + local_store.clone(), + cache_home.clone(), + ReadableSize::mb(10), + ); + // No entry before recovery. + assert!(cache.reader((region_id, file_ids[0])).await.is_none()); + cache.recover().await.unwrap(); + + for (i, file_id) in file_ids.iter().enumerate() { + let key = (region_id, *file_id); + let mut reader = cache.reader(key).await.unwrap(); + let mut buf = String::new(); + reader.read_to_string(&mut buf).await.unwrap(); + assert_eq!(i.to_string(), buf); + } + } +}