Skip to content

Commit

Permalink
test: tests for file cache
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Dec 28, 2023
1 parent 36aac41 commit 9176b1e
Showing 1 changed file with 184 additions and 3 deletions.
187 changes: 184 additions & 3 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ use futures::{FutureExt, TryStreamExt};
use moka::future::Cache;
use moka::notification::RemovalCause;
use object_store::util::{join_dir, join_path};
use object_store::{Metakey, ObjectStore};
use object_store::{ErrorKind, Metakey, ObjectStore, Reader};
use snafu::ResultExt;
use store_api::storage::RegionId;

use crate::cache::FILE_TYPE;
use crate::error::{OpenDalSnafu, Result};
use crate::metrics::CACHE_BYTES;
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
use crate::sst::file::FileId;

/// Subdirectory of cached files.
Expand Down Expand Up @@ -59,10 +59,12 @@ impl FileCache {
let file_dir = cache_file_dir(&cache_home);
let cache_store = local_store.clone();
let cache_file_dir = file_dir.clone();
let memory_index = Cache::builder().max_capacity(capacity.as_bytes()).weigher(|_key, value: &IndexValue| -> u32 {
let memory_index = Cache::builder()
.weigher(|_key, value: &IndexValue| -> u32 {
// We only measure space on local store.
value.file_size
})
.max_capacity(capacity.as_bytes())
.async_eviction_listener(move |key, value, cause| {
let store = cache_store.clone();
let file_path = cache_file_path(&cache_file_dir, *key);
Expand Down Expand Up @@ -104,6 +106,39 @@ impl FileCache {
self.memory_index.insert(key, value).await;
}

/// Reads a file from the cache.
pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
if !self.memory_index.contains_key(&key) {
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
return None;
}

let file_path = cache_file_path(&self.file_dir, key);
match self.local_store.reader(&file_path).await {
Ok(reader) => {
CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
Some(reader)
}
Err(e) => {
if e.kind() != ErrorKind::NotFound {
warn!("Failed to get file for key {:?}, err: {}", key, e);
}
// We removes the file from the index.
self.memory_index.remove(&key).await;
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
None
}
}
}

/// Removes a file from the cache explicitly.
pub(crate) async fn remove(&self, key: IndexKey) {
let file_path = cache_file_path(&self.file_dir, key);
if let Err(e) = self.local_store.delete(&file_path).await {
warn!(e; "Failed to delete a cached file {}", file_path);
}
}

/// Recovers the index from local store.
pub(crate) async fn recover(&self) -> Result<()> {
let now = Instant::now();
Expand Down Expand Up @@ -181,3 +216,149 @@ fn parse_index_key(name: &str) -> Option<IndexKey> {

Some((region_id, file_id))
}

#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use futures::AsyncReadExt;
use object_store::services::Fs;

use super::*;

fn new_fs_store(path: &str) -> ObjectStore {
let mut builder = Fs::default();
builder.root(path);
ObjectStore::new(builder).unwrap().finish()
}

#[tokio::test]
async fn test_file_cache_basic() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache_home = "cache".to_string();

let cache = FileCache::new(
local_store.clone(),
cache_home.clone(),
ReadableSize::mb(10),
);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
let file_path = cache_file_path(&cache.file_dir, key);

// Get an empty file.
assert!(cache.reader(key).await.is_none());

// Write a file.
local_store
.write(&file_path, b"hello".as_slice())
.await
.unwrap();
// Add to the cache.
cache
.put((region_id, file_id), IndexValue { file_size: 5 })
.await;

// Read file content.
let mut reader = cache.reader(key).await.unwrap();
let mut buf = String::new();
reader.read_to_string(&mut buf).await.unwrap();
assert_eq!("hello", buf);

// Remove the file.
cache.remove(key).await;
assert!(cache.reader(key).await.is_none());

// Ensure all pending tasks of the moka cache is done before assertion.
cache.memory_index.run_pending_tasks().await;

// The file also not exists.
assert!(!local_store.is_exist(&file_path).await.unwrap());
}

#[tokio::test]
async fn test_file_cache_file_removed() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache_home = "cache".to_string();

let cache = FileCache::new(
local_store.clone(),
cache_home.clone(),
ReadableSize::mb(10),
);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
let file_path = cache_file_path(&cache.file_dir, key);

// Write a file.
local_store
.write(&file_path, b"hello".as_slice())
.await
.unwrap();
// Add to the cache.
cache
.put((region_id, file_id), IndexValue { file_size: 5 })
.await;

// Remove the file but keep the index.
local_store.delete(&file_path).await.unwrap();

// Reader is none.
assert!(cache.reader(key).await.is_none());
// Key is removed.
assert!(!cache.memory_index.contains_key(&key));
}

#[tokio::test]
async fn test_file_cache_recover() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache_home = "cache".to_string();
let cache = FileCache::new(
local_store.clone(),
cache_home.clone(),
ReadableSize::mb(10),
);

let region_id = RegionId::new(2000, 0);
// Write N files.
let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
for (i, file_id) in file_ids.iter().enumerate() {
let key = (region_id, *file_id);
let file_path = cache_file_path(&cache.file_dir, key);
let bytes = i.to_string().into_bytes();
local_store.write(&file_path, bytes.clone()).await.unwrap();

// Add to the cache.
cache
.put(
(region_id, *file_id),
IndexValue {
file_size: bytes.len() as u32,
},
)
.await;
}

// Recover the cache.
let cache = FileCache::new(
local_store.clone(),
cache_home.clone(),
ReadableSize::mb(10),
);
// No entry before recovery.
assert!(cache.reader((region_id, file_ids[0])).await.is_none());
cache.recover().await.unwrap();

for (i, file_id) in file_ids.iter().enumerate() {
let key = (region_id, *file_id);
let mut reader = cache.reader(key).await.unwrap();
let mut buf = String::new();
reader.read_to_string(&mut buf).await.unwrap();
assert_eq!(i.to_string(), buf);
}
}
}

0 comments on commit 9176b1e

Please sign in to comment.