From 29cdb5ae944d86a441aa3784d2dd31b5e0012269 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 27 Dec 2023 14:28:23 +0800 Subject: [PATCH 1/8] feat: recover cache --- src/mito2/src/cache.rs | 3 + src/mito2/src/cache/file_cache.rs | 183 ++++++++++++++++++++++++++++++ 2 files changed, 186 insertions(+) create mode 100644 src/mito2/src/cache/file_cache.rs diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 529e5d3d4eee..5ea9c257380c 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -15,6 +15,7 @@ //! Cache for the engine. mod cache_size; +mod file_cache; #[cfg(test)] pub(crate) mod test_util; @@ -38,6 +39,8 @@ const SST_META_TYPE: &str = "sst_meta"; const VECTOR_TYPE: &str = "vector"; // Metrics type key for pages. const PAGE_TYPE: &str = "page"; +// Metrics type key for files on the local store. +const FILE_TYPE: &str = "file"; /// Manages cached data for the engine. pub struct CacheManager { diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs new file mode 100644 index 000000000000..238790ae3d52 --- /dev/null +++ b/src/mito2/src/cache/file_cache.rs @@ -0,0 +1,183 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! A cache for files. + +use std::time::Instant; + +use common_base::readable_size::ReadableSize; +use common_telemetry::{info, warn}; +use futures::{FutureExt, TryStreamExt}; +use moka::future::Cache; +use moka::notification::RemovalCause; +use object_store::util::{join_dir, join_path}; +use object_store::{Metakey, ObjectStore}; +use snafu::ResultExt; +use store_api::storage::RegionId; + +use crate::cache::FILE_TYPE; +use crate::error::{OpenDalSnafu, Result}; +use crate::metrics::CACHE_BYTES; +use crate::sst::file::FileId; + +/// Subdirectory of cached files. +const FILE_DIR: &str = "files"; + +/// A file cache manages files on local store and evict files based +/// on size. +#[derive(Debug)] +pub(crate) struct FileCache { + /// Local store to cache files. + local_store: ObjectStore, + /// Cached file directory under cache home. + file_dir: String, + /// Index to track cached files. + /// + /// File id is enough to identity a file uniquely. + memory_index: Cache, +} + +impl FileCache { + /// Creates a new file cache. + pub(crate) fn new( + local_store: ObjectStore, + cache_home: String, + capacity: ReadableSize, + ) -> FileCache { + // Stores files under `cache_home/{FILE_DIR}`. + let file_dir = cache_file_dir(&cache_home); + let cache_store = local_store.clone(); + let cache_file_dir = file_dir.clone(); + let memory_index = Cache::builder().max_capacity(capacity.as_bytes()).weigher(|_key, value: &IndexValue| -> u32 { + // We only measure space on local store. + value.file_size + }) + .async_eviction_listener(move |key, value, cause| { + let store = cache_store.clone(); + let file_path = cache_file_path(&cache_file_dir, *key); + async move { + if let RemovalCause::Replaced = cause { + // The cache is replaced by another file. This is unexpected, we don't remove the same + // file but updates the metrics as the file is already replaced by users. + CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into()); + warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.0); + return; + } + + match store.delete(&file_path).await { + Ok(()) => { + CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into()); + } + Err(e) => { + warn!(e; "Failed to delete cached file {} for region {}", file_path, key.0); + } + } + } + .boxed() + }) + .build(); + FileCache { + local_store, + file_dir, + memory_index, + } + } + + /// Puts a file into the cache. + /// + /// Callers should ensure the file is in the correct path. + pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) { + CACHE_BYTES + .with_label_values(&[FILE_TYPE]) + .add(value.file_size.into()); + self.memory_index.insert(key, value).await; + } + + /// Recovers the index from local store. + pub(crate) async fn recover(&self) -> Result<()> { + let now = Instant::now(); + + let mut lister = self + .local_store + .lister_with(&self.file_dir) + .metakey(Metakey::ContentLength) + .await + .context(OpenDalSnafu)?; + let (mut total_size, mut total_keys) = (0, 0); + while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? { + let meta = entry.metadata(); + if !meta.is_file() { + continue; + } + let Some(key) = parse_index_key(entry.name()) else { + continue; + }; + let file_size = meta.content_length() as u32; + self.memory_index + .insert(key, IndexValue { file_size }) + .await; + total_size += file_size; + total_keys += 1; + } + // The metrics is a signed int gauge so we can updates it finally. + CACHE_BYTES + .with_label_values(&[FILE_TYPE]) + .add(total_size.into()); + + info!( + "Recovered file cache, num_keys: {}, num_bytes: {}, cost: {:?}", + total_keys, + total_size, + now.elapsed() + ); + + Ok(()) + } +} + +/// Key of file cache index. +pub(crate) type IndexKey = (RegionId, FileId); + +/// An entity that describes the file in the file cache. +/// +/// It should only keep minimal information needed by the cache. +#[derive(Debug, Clone)] +pub(crate) struct IndexValue { + /// Size of the file in bytes. + file_size: u32, +} + +/// Returns the directory to store files. +fn cache_file_dir(cache_home: &str) -> String { + join_dir(cache_home, FILE_DIR) +} + +/// Generates the path to the cached file. +/// +/// The file name format is `{region_id}.{file_id}` +fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String { + join_path(cache_file_dir, &format!("{}.{}", key.0.as_u64(), key.1)) +} + +/// Parse index key from the file name. +fn parse_index_key(name: &str) -> Option { + let mut splited = name.split('.'); + let region_id = splited.next().and_then(|s| { + let id = s.parse::().ok()?; + Some(RegionId::from_u64(id)) + })?; + let file_id = splited.next().and_then(|s| FileId::parse_str(s).ok())?; + + Some((region_id, file_id)) +} From 36aac41d639ee0a2c5890a4c553fc56de8f85fde Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 27 Dec 2023 22:01:00 +0800 Subject: [PATCH 2/8] feat: moka features --- src/mito2/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 8c3ef50ec2c7..046bc14db639 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -42,7 +42,7 @@ humantime-serde.workspace = true lazy_static = "1.4" log-store = { workspace = true, optional = true } memcomparable = "0.2" -moka = { workspace = true, features = ["sync"] } +moka = { workspace = true, features = ["sync", "future"] } num_cpus = "1.13" object-store.workspace = true parquet = { workspace = true, features = ["async"] } From 9176b1eec3c1dc6a83449e4f5428750a5a64c528 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 27 Dec 2023 22:01:43 +0800 Subject: [PATCH 3/8] test: tests for file cache --- src/mito2/src/cache/file_cache.rs | 187 +++++++++++++++++++++++++++++- 1 file changed, 184 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 238790ae3d52..a501654f1694 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -22,13 +22,13 @@ use futures::{FutureExt, TryStreamExt}; use moka::future::Cache; use moka::notification::RemovalCause; use object_store::util::{join_dir, join_path}; -use object_store::{Metakey, ObjectStore}; +use object_store::{ErrorKind, Metakey, ObjectStore, Reader}; use snafu::ResultExt; use store_api::storage::RegionId; use crate::cache::FILE_TYPE; use crate::error::{OpenDalSnafu, Result}; -use crate::metrics::CACHE_BYTES; +use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS}; use crate::sst::file::FileId; /// Subdirectory of cached files. @@ -59,10 +59,12 @@ impl FileCache { let file_dir = cache_file_dir(&cache_home); let cache_store = local_store.clone(); let cache_file_dir = file_dir.clone(); - let memory_index = Cache::builder().max_capacity(capacity.as_bytes()).weigher(|_key, value: &IndexValue| -> u32 { + let memory_index = Cache::builder() + .weigher(|_key, value: &IndexValue| -> u32 { // We only measure space on local store. value.file_size }) + .max_capacity(capacity.as_bytes()) .async_eviction_listener(move |key, value, cause| { let store = cache_store.clone(); let file_path = cache_file_path(&cache_file_dir, *key); @@ -104,6 +106,39 @@ impl FileCache { self.memory_index.insert(key, value).await; } + /// Reads a file from the cache. + pub(crate) async fn reader(&self, key: IndexKey) -> Option { + if !self.memory_index.contains_key(&key) { + CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + return None; + } + + let file_path = cache_file_path(&self.file_dir, key); + match self.local_store.reader(&file_path).await { + Ok(reader) => { + CACHE_HIT.with_label_values(&[FILE_TYPE]).inc(); + Some(reader) + } + Err(e) => { + if e.kind() != ErrorKind::NotFound { + warn!("Failed to get file for key {:?}, err: {}", key, e); + } + // We removes the file from the index. + self.memory_index.remove(&key).await; + CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + None + } + } + } + + /// Removes a file from the cache explicitly. + pub(crate) async fn remove(&self, key: IndexKey) { + let file_path = cache_file_path(&self.file_dir, key); + if let Err(e) = self.local_store.delete(&file_path).await { + warn!(e; "Failed to delete a cached file {}", file_path); + } + } + /// Recovers the index from local store. pub(crate) async fn recover(&self) -> Result<()> { let now = Instant::now(); @@ -181,3 +216,149 @@ fn parse_index_key(name: &str) -> Option { Some((region_id, file_id)) } + +#[cfg(test)] +mod tests { + use common_test_util::temp_dir::create_temp_dir; + use futures::AsyncReadExt; + use object_store::services::Fs; + + use super::*; + + fn new_fs_store(path: &str) -> ObjectStore { + let mut builder = Fs::default(); + builder.root(path); + ObjectStore::new(builder).unwrap().finish() + } + + #[tokio::test] + async fn test_file_cache_basic() { + let dir = create_temp_dir(""); + let local_store = new_fs_store(dir.path().to_str().unwrap()); + let cache_home = "cache".to_string(); + + let cache = FileCache::new( + local_store.clone(), + cache_home.clone(), + ReadableSize::mb(10), + ); + let region_id = RegionId::new(2000, 0); + let file_id = FileId::random(); + let key = (region_id, file_id); + let file_path = cache_file_path(&cache.file_dir, key); + + // Get an empty file. + assert!(cache.reader(key).await.is_none()); + + // Write a file. + local_store + .write(&file_path, b"hello".as_slice()) + .await + .unwrap(); + // Add to the cache. + cache + .put((region_id, file_id), IndexValue { file_size: 5 }) + .await; + + // Read file content. + let mut reader = cache.reader(key).await.unwrap(); + let mut buf = String::new(); + reader.read_to_string(&mut buf).await.unwrap(); + assert_eq!("hello", buf); + + // Remove the file. + cache.remove(key).await; + assert!(cache.reader(key).await.is_none()); + + // Ensure all pending tasks of the moka cache is done before assertion. + cache.memory_index.run_pending_tasks().await; + + // The file also not exists. + assert!(!local_store.is_exist(&file_path).await.unwrap()); + } + + #[tokio::test] + async fn test_file_cache_file_removed() { + let dir = create_temp_dir(""); + let local_store = new_fs_store(dir.path().to_str().unwrap()); + let cache_home = "cache".to_string(); + + let cache = FileCache::new( + local_store.clone(), + cache_home.clone(), + ReadableSize::mb(10), + ); + let region_id = RegionId::new(2000, 0); + let file_id = FileId::random(); + let key = (region_id, file_id); + let file_path = cache_file_path(&cache.file_dir, key); + + // Write a file. + local_store + .write(&file_path, b"hello".as_slice()) + .await + .unwrap(); + // Add to the cache. + cache + .put((region_id, file_id), IndexValue { file_size: 5 }) + .await; + + // Remove the file but keep the index. + local_store.delete(&file_path).await.unwrap(); + + // Reader is none. + assert!(cache.reader(key).await.is_none()); + // Key is removed. + assert!(!cache.memory_index.contains_key(&key)); + } + + #[tokio::test] + async fn test_file_cache_recover() { + let dir = create_temp_dir(""); + let local_store = new_fs_store(dir.path().to_str().unwrap()); + let cache_home = "cache".to_string(); + let cache = FileCache::new( + local_store.clone(), + cache_home.clone(), + ReadableSize::mb(10), + ); + + let region_id = RegionId::new(2000, 0); + // Write N files. + let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect(); + for (i, file_id) in file_ids.iter().enumerate() { + let key = (region_id, *file_id); + let file_path = cache_file_path(&cache.file_dir, key); + let bytes = i.to_string().into_bytes(); + local_store.write(&file_path, bytes.clone()).await.unwrap(); + + // Add to the cache. + cache + .put( + (region_id, *file_id), + IndexValue { + file_size: bytes.len() as u32, + }, + ) + .await; + } + + // Recover the cache. + let cache = FileCache::new( + local_store.clone(), + cache_home.clone(), + ReadableSize::mb(10), + ); + // No entry before recovery. + assert!(cache.reader((region_id, file_ids[0])).await.is_none()); + cache.recover().await.unwrap(); + + for (i, file_id) in file_ids.iter().enumerate() { + let key = (region_id, *file_id); + let mut reader = cache.reader(key).await.unwrap(); + let mut buf = String::new(); + reader.read_to_string(&mut buf).await.unwrap(); + assert_eq!(i.to_string(), buf); + } + } +} From 08c3fb251a7fd23c2bd357dd898cb3052f6cdb09 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 28 Dec 2023 14:00:17 +0800 Subject: [PATCH 4/8] chore: suppress warninig --- src/mito2/src/cache.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 5ea9c257380c..cc02a2d037ce 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -15,7 +15,9 @@ //! Cache for the engine. mod cache_size; -mod file_cache; +// TODO(yingwen): Remove this after the write cache is ready. +#[allow(unused)] +pub(crate) mod file_cache; #[cfg(test)] pub(crate) mod test_util; From f04b7bb82d25c49dc1a9d3f9ed2dbe0d76e423c4 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 28 Dec 2023 15:15:38 +0800 Subject: [PATCH 5/8] fix: parse_inde_key consider suffix --- src/mito2/src/cache/file_cache.rs | 35 ++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index a501654f1694..2ead0e744742 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -207,7 +207,7 @@ fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String { /// Parse index key from the file name. fn parse_index_key(name: &str) -> Option { - let mut splited = name.split('.'); + let mut splited = name.splitn(2, '.'); let region_id = splited.next().and_then(|s| { let id = s.parse::().ok()?; Some(RegionId::from_u64(id)) @@ -361,4 +361,37 @@ mod tests { assert_eq!(i.to_string(), buf); } } + + #[test] + fn test_cache_file_path() { + let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap(); + assert_eq!( + "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095", + cache_file_path("test_dir", (RegionId::new(1234, 5), file_id)) + ); + assert_eq!( + "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095", + cache_file_path("test_dir/", (RegionId::new(1234, 5), file_id)) + ); + } + + #[test] + fn test_parse_file_name() { + let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap(); + let region_id = RegionId::new(1234, 5); + assert_eq!( + (region_id, file_id), + parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").unwrap() + ); + assert!(parse_index_key("").is_none()); + assert!(parse_index_key(".").is_none()); + assert!(parse_index_key("5299989643269").is_none()); + assert!(parse_index_key("5299989643269.").is_none()); + assert!(parse_index_key(".5299989643269").is_none()); + assert!(parse_index_key("5299989643269.").is_none()); + assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none()); + assert!( + parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").is_none() + ); + } } From df87709d1f6560d78849366a74251c480bb33899 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 29 Dec 2023 00:27:34 +0800 Subject: [PATCH 6/8] feat: update cache --- src/mito2/src/cache/file_cache.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 2ead0e744742..a0beb83bcb1d 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -113,7 +113,7 @@ impl FileCache { return None; } - let file_path = cache_file_path(&self.file_dir, key); + let file_path = self.cache_file_path(key); match self.local_store.reader(&file_path).await { Ok(reader) => { CACHE_HIT.with_label_values(&[FILE_TYPE]).inc(); @@ -133,7 +133,7 @@ impl FileCache { /// Removes a file from the cache explicitly. pub(crate) async fn remove(&self, key: IndexKey) { - let file_path = cache_file_path(&self.file_dir, key); + let file_path = self.cache_file_path(key); if let Err(e) = self.local_store.delete(&file_path).await { warn!(e; "Failed to delete a cached file {}", file_path); } @@ -179,6 +179,16 @@ impl FileCache { Ok(()) } + + /// Returns the cache file path for the key. + pub(crate) fn cache_file_path(&self, key: IndexKey) -> String { + cache_file_path(&self.file_dir, key) + } + + /// Returns the local store of the file cache. + pub(crate) fn local_store(&self) -> ObjectStore { + self.local_store.clone() + } } /// Key of file cache index. From 1408487ada4bd7cae87ed80bda8afccb77e12d0d Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 29 Dec 2023 00:31:36 +0800 Subject: [PATCH 7/8] feat: expose cache file path --- src/mito2/src/cache/file_cache.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index a0beb83bcb1d..843cb63eaa2e 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -96,9 +96,9 @@ impl FileCache { } } - /// Puts a file into the cache. + /// Puts a file into the cache index. /// - /// Callers should ensure the file is in the correct path. + /// The `WriteCache` should ensure the file is in the correct path. pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) { CACHE_BYTES .with_label_values(&[FILE_TYPE]) From 2ae9523c237a014db1dc409217ba2f462bfb34f6 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 29 Dec 2023 11:11:37 +0800 Subject: [PATCH 8/8] feat: use cache_path in test --- src/mito2/src/cache/file_cache.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 843cb63eaa2e..25fd5d6d62bf 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -255,7 +255,7 @@ mod tests { let region_id = RegionId::new(2000, 0); let file_id = FileId::random(); let key = (region_id, file_id); - let file_path = cache_file_path(&cache.file_dir, key); + let file_path = cache.cache_file_path(key); // Get an empty file. assert!(cache.reader(key).await.is_none()); @@ -301,7 +301,7 @@ mod tests { let region_id = RegionId::new(2000, 0); let file_id = FileId::random(); let key = (region_id, file_id); - let file_path = cache_file_path(&cache.file_dir, key); + let file_path = cache.cache_file_path(key); // Write a file. local_store @@ -338,7 +338,7 @@ mod tests { let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect(); for (i, file_id) in file_ids.iter().enumerate() { let key = (region_id, *file_id); - let file_path = cache_file_path(&cache.file_dir, key); + let file_path = cache.cache_file_path(key); let bytes = i.to_string().into_bytes(); local_store.write(&file_path, bytes.clone()).await.unwrap();