From ff15bc41d6ebacaf9db62ee15043603cfbd8b679 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Sun, 8 Oct 2023 11:27:49 +0800 Subject: [PATCH] feat: improve object storage cache (#2522) * feat: refactor object storage cache with moka * chore: minor fixes * fix: concurrent issues and invalidate cache after write/delete * chore: minor changes * fix: cargo lock * refactor: rename * chore: change DEFAULT_OBJECT_STORE_CACHE_SIZE to 256Mib * fix: typo * chore: style * fix: toml format * chore: toml * fix: toml format * Update src/object-store/src/layers/lru_cache/read_cache.rs Co-authored-by: Ruihang Xia * chore: update Cargo.toml Co-authored-by: Yingwen * chore: update src/object-store/Cargo.toml Co-authored-by: Yingwen * chore: refactor and apply suggestions * fix: typo * feat: adds back allow list for caching * chore: cr suggestion Co-authored-by: Yingwen * chore: cr suggestion Co-authored-by: Yingwen * refactor: wrap inner Accessor with Arc * chore: remove run_pending_task in read and write path * chore: the arc is unnecessary --------- Co-authored-by: Ruihang Xia Co-authored-by: Yingwen --- Cargo.lock | 114 +------ Cargo.toml | 2 +- config/datanode.example.toml | 6 + config/standalone.example.toml | 4 + src/datanode/src/config.rs | 39 ++- src/datanode/src/datanode.rs | 3 - src/datanode/src/store.rs | 18 +- src/mito2/Cargo.toml | 2 +- src/object-store/Cargo.toml | 9 +- src/object-store/src/layers/lru_cache.rs | 207 ++++--------- .../src/layers/lru_cache/read_cache.rs | 290 ++++++++++++++++++ src/object-store/src/metrics.rs | 10 +- src/object-store/tests/object_store_test.rs | 98 +++++- tests-integration/src/test_util.rs | 2 +- tests-integration/tests/region_failover.rs | 1 + 15 files changed, 500 insertions(+), 305 deletions(-) create mode 100644 src/object-store/src/layers/lru_cache/read_cache.rs diff --git a/Cargo.lock b/Cargo.lock index e2ef70d9bb96..a94bd04889cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -579,26 +579,6 @@ dependencies = [ "zstd-safe 6.0.6", ] -[[package]] -name = "async-io" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" -dependencies = [ - "async-lock", - "autocfg", - "cfg-if 1.0.0", - "concurrent-queue", - "futures-lite", - "log", - "parking", - "polling", - "rustix 0.37.23", - "slab", - "socket2 0.4.9", - "waker-fn", -] - [[package]] name = "async-lock" version = "2.8.0" @@ -3526,21 +3506,6 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" -[[package]] -name = "futures-lite" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" -dependencies = [ - "fastrand 1.9.0", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-macro" version = "0.3.28" @@ -5012,12 +4977,6 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" -[[package]] -name = "linux-raw-sys" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" - [[package]] name = "linux-raw-sys" version = "0.4.5" @@ -5122,15 +5081,6 @@ dependencies = [ "vob", ] -[[package]] -name = "lru" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71e7d46de488603ffdd5f30afbc64fbba2378214a2c3a2fb83abf3d33126df17" -dependencies = [ - "hashbrown 0.13.2", -] - [[package]] name = "lru" version = "0.10.1" @@ -5598,12 +5548,12 @@ dependencies = [ [[package]] name = "moka" -version = "0.11.3" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa6e72583bf6830c956235bff0d5afec8cf2952f579ebad18ae7821a917d950f" +checksum = "8dc65d4615c08c8a13d91fd404b5a2a4485ba35b4091e3315cf8798d280c2f29" dependencies = [ - "async-io", "async-lock", + "async-trait", "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", @@ -5612,7 +5562,6 @@ dependencies = [ "parking_lot 0.12.1", "quanta 0.11.1", "rustc_version", - "scheduled-thread-pool", "skeptic", "smallvec", "tagptr", @@ -5666,7 +5615,7 @@ dependencies = [ "futures-sink", "futures-util", "lazy_static", - "lru 0.10.1", + "lru", "mio", "mysql_common", "once_cell", @@ -6009,14 +5958,14 @@ dependencies = [ "anyhow", "async-trait", "bytes", + "common-runtime", "common-telemetry", "common-test-util", "futures", - "lru 0.9.0", "md5", "metrics", + "moka", "opendal", - "pin-project", "tokio", "uuid", ] @@ -6397,12 +6346,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "parking" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" - [[package]] name = "parking_lot" version = "0.11.2" @@ -6834,22 +6777,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "polling" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" -dependencies = [ - "autocfg", - "bitflags 1.3.2", - "cfg-if 1.0.0", - "concurrent-queue", - "libc", - "log", - "pin-project-lite", - "windows-sys 0.48.0", -] - [[package]] name = "portable-atomic" version = "0.3.20" @@ -8058,20 +7985,6 @@ dependencies = [ "windows-sys 0.45.0", ] -[[package]] -name = "rustix" -version = "0.37.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" -dependencies = [ - "bitflags 1.3.2", - "errno 0.3.3", - "io-lifetimes", - "libc", - "linux-raw-sys 0.3.8", - "windows-sys 0.48.0", -] - [[package]] name = "rustix" version = "0.38.10" @@ -8577,15 +8490,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "scheduled-thread-pool" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" -dependencies = [ - "parking_lot 0.12.1", -] - [[package]] name = "schemars" version = "0.8.13" @@ -11196,12 +11100,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8e76fae08f03f96e166d2dfda232190638c10e0383841252416f9cfe2ae60e6" -[[package]] -name = "waker-fn" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" - [[package]] name = "walkdir" version = "2.3.3" diff --git a/Cargo.toml b/Cargo.toml index e865c2b7ece6..6fdd77e2056e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,7 +82,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" -moka = { version = "0.11" } +moka = "0.12" once_cell = "1.18" opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] } parquet = "43.0" diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 4f52380d658f..20f69e73aa88 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -47,6 +47,12 @@ type = "File" # TTL for all tables. Disabled by default. # global_ttl = "7d" +# Cache configuration for object storage such as 'S3' etc. +# The local file cache directory +# cache_path = "/path/local_cache" +# The local file cache capacity in bytes. +# cache_capacity = "256Mib" + # Compaction options, see `standalone.example.toml`. [storage.compaction] max_inflight_tasks = 4 diff --git a/config/standalone.example.toml b/config/standalone.example.toml index cd098908a727..497b43f93462 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -115,6 +115,10 @@ data_home = "/tmp/greptimedb/" type = "File" # TTL for all tables. Disabled by default. # global_ttl = "7d" +# Cache configuration for object storage such as 'S3' etc. +# cache_path = "/path/local_cache" +# The local file cache capacity in bytes. +# cache_capacity = "256Mib" # Compaction options. [storage.compaction] diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 74c86c5ced53..11b30aea07fb 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -37,7 +37,7 @@ use storage::config::{ }; use storage::scheduler::SchedulerConfig; -pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024); +pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::mb(256); /// Default data home in file storage const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb"; @@ -90,6 +90,15 @@ impl Default for StorageConfig { #[serde(default)] pub struct FileConfig {} +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(default)] +pub struct ObjectStorageCacheConfig { + /// The local file cache directory + pub cache_path: Option, + /// The cache capacity in bytes + pub cache_capacity: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct S3Config { @@ -101,8 +110,8 @@ pub struct S3Config { pub secret_access_key: SecretString, pub endpoint: Option, pub region: Option, - pub cache_path: Option, - pub cache_capacity: Option, + #[serde(flatten)] + pub cache: ObjectStorageCacheConfig, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -115,8 +124,8 @@ pub struct OssConfig { #[serde(skip_serializing)] pub access_key_secret: SecretString, pub endpoint: String, - pub cache_path: Option, - pub cache_capacity: Option, + #[serde(flatten)] + pub cache: ObjectStorageCacheConfig, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -130,8 +139,8 @@ pub struct AzblobConfig { pub account_key: SecretString, pub endpoint: String, pub sas_token: Option, - pub cache_path: Option, - pub cache_capacity: Option, + #[serde(flatten)] + pub cache: ObjectStorageCacheConfig, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -143,8 +152,8 @@ pub struct GcsConfig { #[serde(skip_serializing)] pub credential_path: SecretString, pub endpoint: String, - pub cache_path: Option, - pub cache_capacity: Option, + #[serde(flatten)] + pub cache: ObjectStorageCacheConfig, } impl Default for S3Config { @@ -156,8 +165,7 @@ impl Default for S3Config { secret_access_key: SecretString::from(String::default()), endpoint: Option::default(), region: Option::default(), - cache_path: Option::default(), - cache_capacity: Option::default(), + cache: ObjectStorageCacheConfig::default(), } } } @@ -170,8 +178,7 @@ impl Default for OssConfig { access_key_id: SecretString::from(String::default()), access_key_secret: SecretString::from(String::default()), endpoint: String::default(), - cache_path: Option::default(), - cache_capacity: Option::default(), + cache: ObjectStorageCacheConfig::default(), } } } @@ -184,9 +191,8 @@ impl Default for AzblobConfig { account_name: SecretString::from(String::default()), account_key: SecretString::from(String::default()), endpoint: String::default(), - cache_path: Option::default(), - cache_capacity: Option::default(), sas_token: Option::default(), + cache: ObjectStorageCacheConfig::default(), } } } @@ -199,8 +205,7 @@ impl Default for GcsConfig { scope: String::default(), credential_path: SecretString::from(String::default()), endpoint: String::default(), - cache_path: Option::default(), - cache_capacity: Option::default(), + cache: ObjectStorageCacheConfig::default(), } } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 387a66426f24..3d127faf3322 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use catalog::kvbackend::MetaKvBackend; use catalog::memory::MemoryCatalogManager; -use common_base::readable_size::ReadableSize; use common_base::Plugins; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; @@ -62,8 +61,6 @@ use crate::region_server::RegionServer; use crate::server::Services; use crate::store; -pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024); - const OPEN_REGION_PARALLELISM: usize = 16; /// Datanode service. diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 937729671fb3..0a70c28e30a4 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -76,29 +76,33 @@ async fn create_object_store_with_cache( ) -> Result { let (cache_path, cache_capacity) = match store_config { ObjectStoreConfig::S3(s3_config) => { - let path = s3_config.cache_path.as_ref(); + let path = s3_config.cache.cache_path.as_ref(); let capacity = s3_config + .cache .cache_capacity .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); (path, capacity) } ObjectStoreConfig::Oss(oss_config) => { - let path = oss_config.cache_path.as_ref(); + let path = oss_config.cache.cache_path.as_ref(); let capacity = oss_config + .cache .cache_capacity .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); (path, capacity) } ObjectStoreConfig::Azblob(azblob_config) => { - let path = azblob_config.cache_path.as_ref(); + let path = azblob_config.cache.cache_path.as_ref(); let capacity = azblob_config + .cache .cache_capacity .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); (path, capacity) } ObjectStoreConfig::Gcs(gcs_config) => { - let path = gcs_config.cache_path.as_ref(); + let path = gcs_config.cache.cache_path.as_ref(); let capacity = gcs_config + .cache .cache_capacity .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); (path, capacity) @@ -119,6 +123,12 @@ async fn create_object_store_with_cache( let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize) .await .context(error::InitBackendSnafu)?; + + info!( + "Enabled local object storage cache, path: {}, capacity: {}.", + path, cache_capacity + ); + Ok(object_store.layer(cache_layer)) } else { Ok(object_store) diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 94e569c8b014..55795df28bbe 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -40,7 +40,7 @@ humantime-serde = { workspace = true } lazy_static = "1.4" memcomparable = "0.2" metrics.workspace = true -moka.workspace = true +moka = { workspace = true, features = ["sync"] } object-store = { workspace = true } parquet = { workspace = true, features = ["async"] } paste.workspace = true diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index b3b192acf25e..49bf01464d4a 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -7,19 +7,20 @@ license.workspace = true [dependencies] async-trait = "0.1" bytes = "1.4" -futures = { version = "0.3" } -lru = "0.9" +common-runtime.workspace = true +common-telemetry.workspace = true +futures.workspace = true md5 = "0.7" metrics.workspace = true +moka = { workspace = true, features = ["future"] } opendal = { version = "0.40", features = [ "layers-tracing", "layers-metrics", ] } -pin-project = "1.0" -tokio.workspace = true uuid.workspace = true [dev-dependencies] anyhow = "1.0" common-telemetry = { workspace = true } common-test-util = { workspace = true } +tokio.workspace = true diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index 8987a210aa0a..c0958638b90d 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -12,101 +12,70 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::num::NonZeroUsize; -use std::ops::DerefMut; use std::sync::Arc; use async_trait::async_trait; -use lru::LruCache; -use metrics::increment_counter; -use opendal::raw::oio::{Page, Read, ReadExt, Reader, WriteExt}; +use opendal::raw::oio::Read; use opendal::raw::{ Accessor, Layer, LayeredAccessor, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite, }; -use opendal::{ErrorKind, Result}; -use tokio::sync::Mutex; - -use crate::metrics::{ - OBJECT_STORE_LRU_CACHE_ERROR, OBJECT_STORE_LRU_CACHE_ERROR_KIND, OBJECT_STORE_LRU_CACHE_HIT, - OBJECT_STORE_LRU_CACHE_MISS, -}; +use opendal::Result; +mod read_cache; +use common_telemetry::logging::info; +use read_cache::ReadCache; +/// An opendal layer with local LRU file cache supporting. #[derive(Clone)] -pub struct LruCacheLayer { - cache: Arc, - lru_cache: Arc>>, +pub struct LruCacheLayer { + // The read cache + read_cache: ReadCache, } impl LruCacheLayer { - pub async fn new(cache: Arc, capacity: usize) -> Result { - let layer = Self { - cache, - lru_cache: Arc::new(Mutex::new(LruCache::new( - NonZeroUsize::new(capacity).unwrap(), - ))), - }; - layer.recover_keys().await?; - - Ok(layer) - } + /// Create a `[LruCacheLayer]` with local file cache and capacity in bytes. + pub async fn new(file_cache: Arc, capacity: usize) -> Result { + let read_cache = ReadCache::new(file_cache, capacity); + let (entries, bytes) = read_cache.recover_cache().await?; - /// Recover existing keys from `cache` to `lru_cache`. - async fn recover_keys(&self) -> Result<()> { - let (_, mut pager) = self.cache.list("/", OpList::default()).await?; + info!( + "Recovered {} entries and total size {} in bytes for LruCacheLayer", + entries, bytes + ); - let mut lru_cache = self.lru_cache.lock().await; - while let Some(entries) = pager.next().await? { - for entry in entries { - let _ = lru_cache.push(entry.path().to_string(), ()); - } - } + Ok(Self { read_cache }) + } - Ok(()) + /// Returns true when the local cache contains the specific file + pub async fn contains_file(&self, path: &str) -> bool { + self.read_cache.contains_file(path).await } - pub async fn lru_contains_key(&self, key: &str) -> bool { - self.lru_cache.lock().await.contains(key) + /// Returns the read cache statistics info `(EntryCount, SizeInBytes)`. + pub async fn read_cache_stat(&self) -> (u64, u64) { + self.read_cache.stat().await } } -impl Layer for LruCacheLayer { +impl Layer for LruCacheLayer { type LayeredAccessor = LruCacheAccessor; fn layer(&self, inner: I) -> Self::LayeredAccessor { LruCacheAccessor { inner, - cache: self.cache.clone(), - lru_cache: self.lru_cache.clone(), + read_cache: self.read_cache.clone(), } } } #[derive(Debug)] -pub struct LruCacheAccessor { +pub struct LruCacheAccessor { inner: I, - cache: Arc, - lru_cache: Arc>>, -} - -/// Returns true when the path of the file can be cached. -fn can_cache(path: &str) -> bool { - // TODO(dennis): find a better way - !path.ends_with("_last_checkpoint") -} - -impl LruCacheAccessor { - fn cache_path(&self, path: &str, args: &OpRead) -> String { - format!( - "{:x}.cache-{}", - md5::compute(path), - args.range().to_header() - ) - } + read_cache: ReadCache, } #[async_trait] -impl LayeredAccessor for LruCacheAccessor { +impl LayeredAccessor for LruCacheAccessor { type Inner = I; type Reader = Box; type BlockingReader = I::BlockingReader; @@ -120,84 +89,27 @@ impl LayeredAccessor for LruCacheAccessor { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - if !can_cache(path) { - return self.inner.read(path, args).await.map(to_output_reader); - } - - let path = path.to_string(); - let cache_path = self.cache_path(&path, &args); - let lru_cache = &self.lru_cache; - - // the args is already in the cache path, so we must create a new OpRead. - match self.cache.read(&cache_path, OpRead::default()).await { - Ok((rp, r)) => { - increment_counter!(OBJECT_STORE_LRU_CACHE_HIT); - - // update lru when cache hit - let mut lru_cache = lru_cache.lock().await; - let _ = lru_cache.get_or_insert(cache_path.clone(), || ()); - Ok(to_output_reader((rp, r))) - } - Err(err) if err.kind() == ErrorKind::NotFound => { - increment_counter!(OBJECT_STORE_LRU_CACHE_MISS); - - let (_, mut reader) = self.inner.read(&path, args.clone()).await?; - let (_, mut writer) = self.cache.write(&cache_path, OpWrite::new()).await?; - - while let Some(bytes) = reader.next().await { - writer.write(&bytes?).await?; - } - - writer.close().await?; - - match self.cache.read(&cache_path, OpRead::default()).await { - Ok((rp, reader)) => { - let r = { - // push new cache file name to lru - let mut lru_cache = lru_cache.lock().await; - lru_cache.push(cache_path.clone(), ()) - }; - // delete the evicted cache file - if let Some((k, _v)) = r { - let _ = self.cache.delete(&k, OpDelete::new()).await; - } - return Ok(to_output_reader((rp, reader))); - } - Err(_) => return self.inner.read(&path, args).await.map(to_output_reader), - } - } - Err(err) => { - increment_counter!(OBJECT_STORE_LRU_CACHE_ERROR, OBJECT_STORE_LRU_CACHE_ERROR_KIND => format!("{}", err.kind())); - return self.inner.read(&path, args).await.map(to_output_reader); - } - } + self.read_cache.read(&self.inner, path, args).await } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - self.inner.write(path, args).await + let result = self.inner.write(path, args).await; + + self.read_cache + .invalidate_entries_with_prefix(format!("{:x}", md5::compute(path))) + .await; + + result } async fn delete(&self, path: &str, args: OpDelete) -> Result { - let cache_path = md5::compute(path); - let lru_cache = &self.lru_cache; - - let cache_files: Vec = { - let mut guard = lru_cache.lock().await; - let lru = guard.deref_mut(); - let cache_files = lru - .iter() - .filter(|(k, _v)| k.starts_with(format!("{:x}.cache-", cache_path).as_str())) - .map(|(k, _v)| k.clone()) - .collect::>(); - for k in &cache_files { - let _ = lru.pop(k); - } - cache_files - }; - for file in cache_files { - let _ = self.cache.delete(&file, OpDelete::new()).await; - } - self.inner.delete(path, args).await + let result = self.inner.delete(path, args).await; + + self.read_cache + .invalidate_entries_with_prefix(format!("{:x}", md5::compute(path))) + .await; + + result } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { @@ -205,35 +117,20 @@ impl LayeredAccessor for LruCacheAccessor { } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + // TODO(dennis): support blocking read cache self.inner.blocking_read(path, args) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - self.inner.blocking_write(path, args) + let result = self.inner.blocking_write(path, args); + + self.read_cache + .blocking_invalidate_entries_with_prefix(format!("{:x}", md5::compute(path))); + + result } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { self.inner.blocking_list(path, args) } } - -#[inline] -fn to_output_reader(input: (RpRead, R)) -> (RpRead, Reader) { - (input.0, Box::new(input.1)) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_can_cache() { - assert!(can_cache("test")); - assert!(can_cache("a/b/c.parquet")); - assert!(can_cache("1.json")); - assert!(can_cache("100.checkpoint")); - assert!(can_cache("test/last_checkpoint")); - assert!(!can_cache("test/__last_checkpoint")); - assert!(!can_cache("a/b/c/__last_checkpoint")); - } -} diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs new file mode 100644 index 000000000000..586465b996ac --- /dev/null +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -0,0 +1,290 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::sync::Arc; + +use common_telemetry::logging::debug; +use futures::FutureExt; +use metrics::{decrement_gauge, increment_counter, increment_gauge}; +use moka::future::Cache; +use moka::notification::ListenerFuture; +use opendal::raw::oio::{Page, Read, ReadExt, Reader, WriteExt}; +use opendal::raw::{Accessor, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead}; +use opendal::{Error as OpendalError, ErrorKind, Result}; + +use crate::metrics::{ + OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT, + OBJECT_STORE_LRU_CACHE_MISS, OBJECT_STORE_READ_ERROR, +}; + +/// Cache value for read file +#[derive(Debug, Clone, PartialEq, Eq, Copy)] +enum ReadResult { + // Read success with size + Success(u32), + // File not found + NotFound, +} + +impl ReadResult { + fn size_bytes(&self) -> u32 { + match self { + ReadResult::NotFound => 0, + ReadResult::Success(size) => *size, + } + } +} + +/// Returns true when the path of the file can be cached. +fn can_cache(path: &str) -> bool { + // TODO(dennis): find a better way + !path.ends_with("_last_checkpoint") +} + +/// Generate an unique cache key for the read path and range. +fn read_cache_key(path: &str, args: &OpRead) -> String { + format!( + "{:x}.cache-{}", + md5::compute(path), + args.range().to_header() + ) +} + +/// Local read cache for files in object storage +#[derive(Clone, Debug)] +pub(crate) struct ReadCache { + /// Local file cache backend + file_cache: Arc, + /// Local memory cache to track local cache files + mem_cache: Cache, +} + +impl ReadCache { + /// Create a [`ReadCache`] with capacity in bytes. + pub(crate) fn new(file_cache: Arc, capacity: usize) -> Self { + let file_cache_cloned = file_cache.clone(); + let eviction_listener = + move |read_key: Arc, read_result: ReadResult, cause| -> ListenerFuture { + // Delete the file from local file cache when it's purged from mem_cache. + decrement_gauge!(OBJECT_STORE_LRU_CACHE_ENTRIES, 1.0); + let file_cache_cloned = file_cache_cloned.clone(); + + async move { + if let ReadResult::Success(size) = read_result { + decrement_gauge!(OBJECT_STORE_LRU_CACHE_BYTES, size as f64); + + let result = file_cache_cloned.delete(&read_key, OpDelete::new()).await; + debug!( + "Deleted local cache file `{}`, result: {:?}, cause: {:?}.", + read_key, result, cause + ); + } + } + .boxed() + }; + + Self { + file_cache, + mem_cache: Cache::builder() + .max_capacity(capacity as u64) + .weigher(|_key, value: &ReadResult| -> u32 { + // TODO(dennis): add key's length to weight? + value.size_bytes() + }) + .async_eviction_listener(eviction_listener) + .support_invalidation_closures() + .build(), + } + } + + /// Returns the cache's entry count and total approximate entry size in bytes. + pub(crate) async fn stat(&self) -> (u64, u64) { + self.mem_cache.run_pending_tasks().await; + + (self.mem_cache.entry_count(), self.mem_cache.weighted_size()) + } + + /// Invalidte all cache items which key starts with `prefix`. + pub(crate) async fn invalidate_entries_with_prefix(&self, prefix: String) { + // Safety: always ok when building cache with `support_invalidation_closures`. + self.mem_cache + .invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix)) + .ok(); + } + + /// Blocking version of `invalidate_entries_with_prefix`. + pub(crate) fn blocking_invalidate_entries_with_prefix(&self, prefix: String) { + // Safety: always ok when building cache with `support_invalidation_closures`. + self.mem_cache + .invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix)) + .ok(); + } + + /// Recover existing cache items from `file_cache` to `mem_cache`. + /// Return entry count and total approximate entry size in bytes. + pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> { + let (_, mut pager) = self.file_cache.list("/", OpList::default()).await?; + + while let Some(entries) = pager.next().await? { + for entry in entries { + let read_key = entry.path(); + + // We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly, + // because it's private field. + let size = { + let stat = self.file_cache.stat(read_key, OpStat::default()).await?; + + stat.into_metadata().content_length() + }; + + increment_gauge!(OBJECT_STORE_LRU_CACHE_ENTRIES, 1.0); + increment_gauge!(OBJECT_STORE_LRU_CACHE_BYTES, size as f64); + self.mem_cache + .insert(read_key.to_string(), ReadResult::Success(size as u32)) + .await; + } + } + + Ok(self.stat().await) + } + + /// Returns true when the read cache contains the specific file. + pub(crate) async fn contains_file(&self, path: &str) -> bool { + self.mem_cache.run_pending_tasks().await; + self.mem_cache.contains_key(path) + && self.file_cache.stat(path, OpStat::default()).await.is_ok() + } + + /// Read from a specific path using the OpRead operation. + /// It will attempt to retrieve the data from the local cache. + /// If the data is not found in the local cache, + /// it will fallback to retrieving it from remote object storage + /// and cache the result locally. + pub(crate) async fn read( + &self, + inner: &I, + path: &str, + args: OpRead, + ) -> Result<(RpRead, Box)> + where + I: Accessor, + { + if !can_cache(path) { + return inner.read(path, args).await.map(to_output_reader); + } + + let read_key = read_cache_key(path, &args); + + let read_result = self + .mem_cache + .try_get_with( + read_key.clone(), + self.read_remote(inner, &read_key, path, args.clone()), + ) + .await + .map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?; + + match read_result { + ReadResult::Success(_) => { + // There is a concurrent issue here, the local cache may be purged + // while reading, we have to fallback to remote read + match self.file_cache.read(&read_key, OpRead::default()).await { + Ok(ret) => { + increment_counter!(OBJECT_STORE_LRU_CACHE_HIT, "result" => "success"); + Ok(to_output_reader(ret)) + } + Err(_) => { + increment_counter!(OBJECT_STORE_LRU_CACHE_MISS); + inner.read(path, args).await.map(to_output_reader) + } + } + } + ReadResult::NotFound => { + increment_counter!(OBJECT_STORE_LRU_CACHE_HIT, "result" => "not_found"); + + Err(OpendalError::new( + ErrorKind::NotFound, + &format!("File not found: {path}"), + )) + } + } + } + + /// Read the file from remote storage. If success, write the content into local cache. + async fn read_remote( + &self, + inner: &I, + read_key: &str, + path: &str, + args: OpRead, + ) -> Result + where + I: Accessor, + { + increment_counter!(OBJECT_STORE_LRU_CACHE_MISS); + + let inner_result = inner.read(path, args).await; + + match inner_result { + Ok((rp, mut reader)) => { + let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?; + + while let Some(bytes) = reader.next().await { + writer.write(&bytes?).await?; + } + + // Call `close` to ensure data is written. + writer.close().await?; + + let read_bytes = rp.metadata().content_length() as u32; + increment_gauge!(OBJECT_STORE_LRU_CACHE_ENTRIES, 1.0); + increment_gauge!(OBJECT_STORE_LRU_CACHE_BYTES, read_bytes as f64); + + Ok(ReadResult::Success(read_bytes)) + } + + Err(e) if e.kind() == ErrorKind::NotFound => { + increment_counter!(OBJECT_STORE_READ_ERROR, "kind" => format!("{}", e.kind())); + increment_gauge!(OBJECT_STORE_LRU_CACHE_ENTRIES, 1.0); + + Ok(ReadResult::NotFound) + } + + Err(e) => { + increment_counter!(OBJECT_STORE_READ_ERROR, "kind" => format!("{}", e.kind())); + + Err(e) + } + } + } +} + +fn to_output_reader(input: (RpRead, R)) -> (RpRead, Reader) { + (input.0, Box::new(input.1)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_can_cache() { + assert!(can_cache("test")); + assert!(can_cache("a/b/c.parquet")); + assert!(can_cache("1.json")); + assert!(can_cache("100.checkpoint")); + assert!(can_cache("test/last_checkpoint")); + assert!(!can_cache("test/__last_checkpoint")); + assert!(!can_cache("a/b/c/__last_checkpoint")); + } +} diff --git a/src/object-store/src/metrics.rs b/src/object-store/src/metrics.rs index 6a8f0ee18994..e46862b734be 100644 --- a/src/object-store/src/metrics.rs +++ b/src/object-store/src/metrics.rs @@ -14,7 +14,13 @@ //! object-store metrics +/// Cache hit counter, no matter what the cache result is. pub const OBJECT_STORE_LRU_CACHE_HIT: &str = "object_store.lru_cache.hit"; +/// Cache miss counter pub const OBJECT_STORE_LRU_CACHE_MISS: &str = "object_store.lru_cache.miss"; -pub const OBJECT_STORE_LRU_CACHE_ERROR: &str = "object_store.lru_cache.error"; -pub const OBJECT_STORE_LRU_CACHE_ERROR_KIND: &str = "error"; +/// Object store read error counter +pub const OBJECT_STORE_READ_ERROR: &str = "object_store.read.errors"; +/// Cache entry number +pub const OBJECT_STORE_LRU_CACHE_ENTRIES: &str = "object_store.lru_cache.entries"; +/// Cache size in bytes +pub const OBJECT_STORE_LRU_CACHE_BYTES: &str = "object_store.lru_cache.bytes"; diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 38cb2b775dd7..d43a3af6f2b1 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -30,6 +30,8 @@ async fn test_object_crud(store: &ObjectStore) -> Result<()> { // Create object handler. // Write data info object; let file_name = "test_file"; + assert!(store.read(file_name).await.is_err()); + store.write(file_name, "Hello, World!").await?; // Read data from object; @@ -80,6 +82,11 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> { store.delete(p2).await?; let entries = store.list("/").await?; assert!(entries.is_empty()); + + assert!(store.read(p1).await.is_err()); + assert!(store.read(p2).await.is_err()); + assert!(store.read(p3).await.is_err()); + Ok(()) } @@ -210,12 +217,48 @@ async fn test_gcs_backend() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_file_backend_with_lru_cache() -> Result<()> { + logging::init_default_ut_logging(); + + let data_dir = create_temp_dir("test_file_backend_with_lru_cache"); + let tmp_dir = create_temp_dir("test_file_backend_with_lru_cache"); + let mut builder = Fs::default(); + let _ = builder + .root(&data_dir.path().to_string_lossy()) + .atomic_write_dir(&tmp_dir.path().to_string_lossy()); + + let store = ObjectStore::new(builder).unwrap().finish(); + + let cache_dir = create_temp_dir("test_file_backend_with_lru_cache"); + let cache_layer = { + let mut builder = Fs::default(); + let _ = builder + .root(&cache_dir.path().to_string_lossy()) + .atomic_write_dir(&cache_dir.path().to_string_lossy()); + let file_cache = Arc::new(builder.build().unwrap()); + + LruCacheLayer::new(Arc::new(file_cache.clone()), 32) + .await + .unwrap() + }; + + let store = store.layer(cache_layer.clone()); + + test_object_crud(&store).await?; + test_object_list(&store).await?; + + assert_eq!(cache_layer.read_cache_stat().await, (4, 0)); + + Ok(()) +} + async fn assert_lru_cache( cache_layer: &LruCacheLayer, file_names: &[&str], ) { for file_name in file_names { - assert!(cache_layer.lru_contains_key(file_name).await); + assert!(cache_layer.contains_file(file_name).await); } } @@ -265,11 +308,11 @@ async fn test_object_store_cache_policy() -> Result<()> { let _ = builder .root(&cache_dir.path().to_string_lossy()) .atomic_write_dir(&cache_dir.path().to_string_lossy()); - let cache_accessor = Arc::new(builder.build().unwrap()); - let cache_store = OperatorBuilder::new(cache_accessor.clone()).finish(); + let file_cache = Arc::new(builder.build().unwrap()); + let cache_store = OperatorBuilder::new(file_cache.clone()).finish(); // create operator for cache dir to verify cache file - let cache_layer = LruCacheLayer::new(Arc::new(cache_accessor.clone()), 3) + let cache_layer = LruCacheLayer::new(Arc::new(file_cache.clone()), 38) .await .unwrap(); let store = store.layer(cache_layer.clone()); @@ -281,13 +324,14 @@ async fn test_object_store_cache_policy() -> Result<()> { store.write(p1, "Hello, object1!").await.unwrap(); store.write(p2, "Hello, object2!").await.unwrap(); - // create cache by read object + // Try to read p1 and p2 let _ = store.read_with(p1).range(0..).await?; let _ = store.read(p1).await?; let _ = store.read_with(p2).range(0..).await?; let _ = store.read_with(p2).range(7..).await?; let _ = store.read(p2).await?; + assert_eq!(cache_layer.read_cache_stat().await, (3, 38)); assert_cache_files( &cache_store, &[ @@ -302,13 +346,16 @@ async fn test_object_store_cache_policy() -> Result<()> { &cache_layer, &[ "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-", "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-", ], ) .await; + // Delete p2 file store.delete(p2).await.unwrap(); + assert_eq!(cache_layer.read_cache_stat().await, (1, 15)); assert_cache_files( &cache_store, &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"], @@ -321,12 +368,17 @@ async fn test_object_store_cache_policy() -> Result<()> { ) .await; + assert!(store.read(p2).await.is_err()); + let p3 = "test_file3"; store.write(p3, "Hello, object3!").await.unwrap(); + // Try to read p3 let _ = store.read(p3).await.unwrap(); let _ = store.read_with(p3).range(0..5).await.unwrap(); + // The entry count is 4, because we have the p2 `NotFound` cache. + assert_eq!(cache_layer.read_cache_stat().await, (4, 35)); assert_cache_files( &cache_store, &[ @@ -347,6 +399,33 @@ async fn test_object_store_cache_policy() -> Result<()> { ) .await; + // try to read p1, p2, p3 + let _ = store.read(p3).await.unwrap(); + let _ = store.read_with(p3).range(0..5).await.unwrap(); + assert!(store.read(p2).await.is_err()); + // Read p1 with range `1..` , the existing p1 with range `0..` must be evicted. + let _ = store.read_with(p1).range(1..15).await.unwrap(); + assert_eq!(cache_layer.read_cache_stat().await, (4, 34)); + assert_cache_files( + &cache_store, + &[ + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", + ], + &["ello, object1!", "Hello, object3!", "Hello"], + ) + .await?; + assert_lru_cache( + &cache_layer, + &[ + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", + ], + ) + .await; + let handle = metric::try_handle().unwrap(); let metric_text = handle.render(); @@ -354,14 +433,15 @@ async fn test_object_store_cache_policy() -> Result<()> { assert!(metric_text.contains("object_store_lru_cache_miss")); drop(cache_layer); - let cache_layer = LruCacheLayer::new(Arc::new(cache_accessor), 3) - .await - .unwrap(); + // Test recover + let cache_layer = LruCacheLayer::new(Arc::new(file_cache), 38).await.unwrap(); + // The p2 `NotFound` cache will not be recovered + assert_eq!(cache_layer.read_cache_stat().await, (3, 34)); assert_lru_cache( &cache_layer, &[ - "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index e3370836eb2f..fb72430806ad 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -206,7 +206,7 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te let mut s3_config = s3_test_config(); if *store_type == StorageType::S3WithCache { - s3_config.cache_path = Some("/tmp/greptimedb_cache".to_string()); + s3_config.cache.cache_path = Some("/tmp/greptimedb_cache".to_string()); } let mut builder = S3::default(); diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 6941abbce580..2405b0a62943 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -174,6 +174,7 @@ async fn has_route_cache(instance: &Arc, table_id: TableId) -> bool { cache .get(TableRouteKey::new(table_id).as_raw_key().as_slice()) + .await .is_some() }