Skip to content

Commit

Permalink
fix: correct write cache's metric labels (#5227)
Browse files Browse the repository at this point in the history
* refactor: remove unused field in WriteCache

Signed-off-by: Ruihang Xia <[email protected]>

* refactor: unify read and write cache path

Signed-off-by: Ruihang Xia <[email protected]>

* update config and fix clippy

Signed-off-by: Ruihang Xia <[email protected]>

* remove unnecessary methods and adapt test

Signed-off-by: Ruihang Xia <[email protected]>

* change the default path

Signed-off-by: Ruihang Xia <[email protected]>

* remove remote-home

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Dec 25, 2024
1 parent f33b378 commit a23f269
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 67 deletions.
4 changes: 2 additions & 2 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
Expand Down Expand Up @@ -460,7 +460,7 @@
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/object_cache/write`. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
Expand Down
4 changes: 2 additions & 2 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ data_home = "/tmp/greptimedb/"
type = "File"

## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
## A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling.
## A local file directory, defaults to `{data_home}`. An empty string means disabling.
## @toml2docs:none-default
#+ cache_path = ""

Expand Down Expand Up @@ -478,7 +478,7 @@ auto_flush_interval = "1h"
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_experimental_write_cache = false

## File system path for write cache, defaults to `{data_home}/object_cache/write`.
## File system path for write cache, defaults to `{data_home}`.
experimental_write_cache_path = ""

## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger.
Expand Down
8 changes: 3 additions & 5 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_telemetry::{info, warn};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, OBJECT_CACHE_DIR};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;

use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
Expand Down Expand Up @@ -147,12 +147,10 @@ async fn build_cache_layer(
};

// Enable object cache by default
// Set the cache_path to be `${data_home}/object_cache/read/{name}` by default
// Set the cache_path to be `${data_home}` by default
// if it's not present
if cache_path.is_none() {
let object_cache_path = join_dir(data_home, OBJECT_CACHE_DIR);
let read_cache_path = join_dir(&object_cache_path, "read");
let read_cache_path = join_dir(&read_cache_path, &name.to_lowercase());
let read_cache_path = data_home.to_string();
tokio::fs::create_dir_all(Path::new(&read_cache_path))
.await
.context(CreateDirSnafu {
Expand Down
6 changes: 4 additions & 2 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ use crate::sst::file::FileId;
use crate::sst::parquet::helper::fetch_byte_ranges;
use crate::sst::parquet::metadata::MetadataLoader;

/// Subdirectory of cached files.
const FILE_DIR: &str = "files/";
/// Subdirectory of cached files for write.
///
/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
const FILE_DIR: &str = "cache/object/write/";

/// A file cache manages files on local store and evict files based
/// on size.
Expand Down
9 changes: 0 additions & 9 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
use futures::AsyncWriteExt;
use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
use snafu::ResultExt;

Expand All @@ -44,10 +43,6 @@ use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
pub struct WriteCache {
/// Local file cache.
file_cache: FileCacheRef,
/// Object store manager.
#[allow(unused)]
/// TODO: Remove unused after implementing async write cache
object_store_manager: ObjectStoreManagerRef,
/// Puffin manager factory for index.
puffin_manager_factory: PuffinManagerFactory,
/// Intermediate manager for index.
Expand All @@ -61,7 +56,6 @@ impl WriteCache {
/// `object_store_manager` for all object stores.
pub async fn new(
local_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
puffin_manager_factory: PuffinManagerFactory,
Expand All @@ -72,7 +66,6 @@ impl WriteCache {

Ok(Self {
file_cache,
object_store_manager,
puffin_manager_factory,
intermediate_manager,
})
Expand All @@ -81,7 +74,6 @@ impl WriteCache {
/// Creates a write cache based on local fs.
pub async fn new_fs(
cache_dir: &str,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
puffin_manager_factory: PuffinManagerFactory,
Expand All @@ -92,7 +84,6 @@ impl WriteCache {
let local_store = new_fs_cache_store(cache_dir).await?;
Self::new(
local_store,
object_store_manager,
cache_capacity,
ttl,
puffin_manager_factory,
Expand Down
7 changes: 2 additions & 5 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
use object_store::util::join_dir;
use object_store::OBJECT_CACHE_DIR;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;

Expand Down Expand Up @@ -97,7 +95,7 @@ pub struct MitoConfig {
pub selector_result_cache_size: ReadableSize,
/// Whether to enable the experimental write cache.
pub enable_experimental_write_cache: bool,
/// File system path for write cache, defaults to `{data_home}/object_cache/write`.
/// File system path for write cache dir's root, defaults to `{data_home}`.
pub experimental_write_cache_path: String,
/// Capacity for write cache.
pub experimental_write_cache_size: ReadableSize,
Expand Down Expand Up @@ -234,8 +232,7 @@ impl MitoConfig {

// Sets write cache path if it is empty.
if self.experimental_write_cache_path.trim().is_empty() {
let object_cache_path = join_dir(data_home, OBJECT_CACHE_DIR);
self.experimental_write_cache_path = join_dir(&object_cache_path, "write");
self.experimental_write_cache_path = data_home.to_string();
}

self.index.sanitize(data_home, &self.inverted_index)?;
Expand Down
13 changes: 3 additions & 10 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,16 +644,9 @@ impl TestEnv {
.unwrap();

let object_store_manager = self.get_object_store_manager().unwrap();
let write_cache = WriteCache::new(
local_store,
object_store_manager,
capacity,
None,
puffin_mgr,
intm_mgr,
)
.await
.unwrap();
let write_cache = WriteCache::new(local_store, capacity, None, puffin_mgr, intm_mgr)
.await
.unwrap();

Arc::new(write_cache)
}
Expand Down
4 changes: 0 additions & 4 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ impl WorkerGroup {
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
puffin_manager_factory.clone(),
intermediate_manager.clone(),
)
Expand Down Expand Up @@ -303,7 +302,6 @@ impl WorkerGroup {
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
puffin_manager_factory.clone(),
intermediate_manager.clone(),
)
Expand Down Expand Up @@ -364,7 +362,6 @@ fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {

async fn write_cache_from_config(
config: &MitoConfig,
object_store_manager: ObjectStoreManagerRef,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> Result<Option<WriteCacheRef>> {
Expand All @@ -383,7 +380,6 @@ async fn write_cache_from_config(

let cache = WriteCache::new_fs(
&config.experimental_write_cache_path,
object_store_manager,
config.experimental_write_cache_size,
config.experimental_write_cache_ttl,
puffin_manager_factory,
Expand Down
10 changes: 5 additions & 5 deletions src/object-store/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ mod prometheus {

static PROMETHEUS_LAYER: OnceLock<Mutex<PrometheusLayer>> = OnceLock::new();

/// This logical tries to extract parent path from the object storage operation
/// the function also relies on assumption that the region path is built from
/// pattern `<data|index>/catalog/schema/table_id/...` OR `greptimedb/object_cache/<read|write>/...`
///
/// We'll get the data/catalog/schema from path.
pub fn build_prometheus_metrics_layer(with_path_label: bool) -> PrometheusLayer {
PROMETHEUS_LAYER
.get_or_init(|| {
// This logical tries to extract parent path from the object storage operation
// the function also relies on assumption that the region path is built from
// pattern `<data|index>/catalog/schema/table_id/....`
//
// We'll get the data/catalog/schema from path.
let path_level = if with_path_label { 3 } else { 0 };

let layer = PrometheusLayer::builder()
Expand Down
11 changes: 3 additions & 8 deletions src/object-store/src/layers/lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,15 @@ impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let result = self.inner.write(path, args).await;

self.read_cache
.invalidate_entries_with_prefix(format!("{:x}", md5::compute(path)))
.await;
self.read_cache.invalidate_entries_with_prefix(path);

result
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let result = self.inner.delete(path, args).await;

self.read_cache
.invalidate_entries_with_prefix(format!("{:x}", md5::compute(path)))
.await;
self.read_cache.invalidate_entries_with_prefix(path);

result
}
Expand All @@ -146,8 +142,7 @@ impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let result = self.inner.blocking_write(path, args);

self.read_cache
.blocking_invalidate_entries_with_prefix(format!("{:x}", md5::compute(path)));
self.read_cache.invalidate_entries_with_prefix(path);

result
}
Expand Down
34 changes: 20 additions & 14 deletions src/object-store/src/layers/lru_cache/read_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ use moka::future::Cache;
use moka::notification::ListenerFuture;
use opendal::raw::oio::{Read, Reader, Write};
use opendal::raw::{Access, OpDelete, OpRead, OpStat, OpWrite, RpRead};
use opendal::{Error as OpendalError, ErrorKind, Metakey, OperatorBuilder, Result};
use opendal::{EntryMode, Error as OpendalError, ErrorKind, Metakey, OperatorBuilder, Result};

use crate::metrics::{
OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
OBJECT_STORE_LRU_CACHE_MISS, OBJECT_STORE_READ_ERROR,
};

const RECOVER_CACHE_LIST_CONCURRENT: usize = 8;
/// Subdirectory of cached files for read.
///
/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
const READ_CACHE_DIR: &str = "cache/object/read";

/// Cache value for read file
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
Expand Down Expand Up @@ -56,12 +60,20 @@ fn can_cache(path: &str) -> bool {
/// Generate a unique cache key for the read path and range.
fn read_cache_key(path: &str, args: &OpRead) -> String {
format!(
"{:x}.cache-{}",
"{READ_CACHE_DIR}/{:x}.cache-{}",
md5::compute(path),
args.range().to_header()
)
}

fn read_cache_root() -> String {
format!("/{READ_CACHE_DIR}/")
}

fn read_cache_key_prefix(path: &str) -> String {
format!("{READ_CACHE_DIR}/{:x}", md5::compute(path))
}

/// Local read cache for files in object storage
#[derive(Debug)]
pub(crate) struct ReadCache<C> {
Expand Down Expand Up @@ -125,16 +137,9 @@ impl<C: Access> ReadCache<C> {
(self.mem_cache.entry_count(), self.mem_cache.weighted_size())
}

/// Invalidate all cache items which key starts with `prefix`.
pub(crate) async fn invalidate_entries_with_prefix(&self, prefix: String) {
// Safety: always ok when building cache with `support_invalidation_closures`.
self.mem_cache
.invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
.ok();
}

/// Blocking version of `invalidate_entries_with_prefix`.
pub(crate) fn blocking_invalidate_entries_with_prefix(&self, prefix: String) {
/// Invalidate all cache items belong to the specific path.
pub(crate) fn invalidate_entries_with_prefix(&self, path: &str) {
let prefix = read_cache_key_prefix(path);
// Safety: always ok when building cache with `support_invalidation_closures`.
self.mem_cache
.invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
Expand All @@ -145,8 +150,9 @@ impl<C: Access> ReadCache<C> {
/// Return entry count and total approximate entry size in bytes.
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
let op = OperatorBuilder::new(self.file_cache.clone()).finish();
let root = read_cache_root();
let mut entries = op
.list_with("/")
.list_with(&root)
.metakey(Metakey::ContentLength | Metakey::ContentType)
.concurrent(RECOVER_CACHE_LIST_CONCURRENT)
.await?;
Expand All @@ -157,7 +163,7 @@ impl<C: Access> ReadCache<C> {
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
// ignore root path
if entry.path() != "/" {
if entry.metadata().mode() == EntryMode::FILE {
self.mem_cache
.insert(read_key.to_string(), ReadResult::Success(size as u32))
.await;
Expand Down
6 changes: 5 additions & 1 deletion src/object-store/tests/object_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ use opendal::raw::{Access, OpList, OpRead};
use opendal::services::{Azblob, Gcs, Oss};
use opendal::{EntryMode, OperatorBuilder};

/// Duplicate of the constant in `src/layers/lru_cache/read_cache.rs`
const READ_CACHE_DIR: &str = "cache/object/read";

async fn test_object_crud(store: &ObjectStore) -> Result<()> {
// Create object handler.
// Write data info object;
Expand Down Expand Up @@ -267,7 +270,8 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {

async fn assert_lru_cache<C: Access>(cache_layer: &LruCacheLayer<C>, file_names: &[&str]) {
for file_name in file_names {
assert!(cache_layer.contains_file(file_name).await, "{file_name}");
let file_path = format!("{READ_CACHE_DIR}/{file_name}");
assert!(cache_layer.contains_file(&file_path).await, "{file_path:?}");
}
}

Expand Down

0 comments on commit a23f269

Please sign in to comment.