Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add ttl to write_cache #4010

Merged
merged 5 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@
| `region_engine.mito.sst_meta_cache_size` | String | `128MB` | Cache size for SST metadata. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/32 of OS memory with a max limitation of 128MB. |
| `region_engine.mito.vector_cache_size` | String | `512MB` | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | `512MB` | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/write_cache`. |
| `region_engine.mito.experimental_write_cache_size` | String | `512MB` | Capacity for write cache. |
| `region_engine.mito.experimental_write_cache_ttl` | String | `1h` | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
Expand Down Expand Up @@ -350,6 +354,10 @@
| `region_engine.mito.sst_meta_cache_size` | String | `128MB` | Cache size for SST metadata. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/32 of OS memory with a max limitation of 128MB. |
| `region_engine.mito.vector_cache_size` | String | `512MB` | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | `512MB` | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/write_cache`. |
| `region_engine.mito.experimental_write_cache_size` | String | `512MB` | Capacity for write cache. |
| `region_engine.mito.experimental_write_cache_ttl` | String | `1h` | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
Expand Down
12 changes: 12 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,18 @@ vector_cache_size = "512MB"
## If not set, it's default to 1/16 of OS memory with a max limitation of 512MB.
page_cache_size = "512MB"

## Whether to enable the experimental write cache.
enable_experimental_write_cache = false

## File system path for write cache, defaults to `{data_home}/write_cache`.
experimental_write_cache_path = ""

## Capacity for write cache.
experimental_write_cache_size = "512MB"

## TTL for write cache.
experimental_write_cache_ttl = "1h"

## Buffer size for SST writing.
sst_write_buffer_size = "8MB"

Expand Down
12 changes: 12 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,18 @@ vector_cache_size = "512MB"
## If not set, it's default to 1/16 of OS memory with a max limitation of 512MB.
page_cache_size = "512MB"

## Whether to enable the experimental write cache.
enable_experimental_write_cache = false

## File system path for write cache, defaults to `{data_home}/write_cache`.
experimental_write_cache_path = ""

## Capacity for write cache.
experimental_write_cache_size = "512MB"

## TTL for write cache.
experimental_write_cache_ttl = "1h"

## Buffer size for SST writing.
sst_write_buffer_size = "8MB"

Expand Down
67 changes: 57 additions & 10 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};

use bytes::Bytes;
use common_base::readable_size::ReadableSize;
Expand Down Expand Up @@ -56,9 +56,13 @@ pub(crate) type FileCacheRef = Arc<FileCache>;

impl FileCache {
/// Creates a new file cache.
pub(crate) fn new(local_store: ObjectStore, capacity: ReadableSize) -> FileCache {
pub(crate) fn new(
local_store: ObjectStore,
capacity: ReadableSize,
ttl: Option<Duration>,
) -> FileCache {
let cache_store = local_store.clone();
let memory_index = Cache::builder()
let mut builder = Cache::builder()
.weigher(|_key, value: &IndexValue| -> u32 {
// We only measure space on local store.
value.file_size
Expand Down Expand Up @@ -87,8 +91,11 @@ impl FileCache {
}
}
.boxed()
})
.build();
});
if let Some(ttl) = ttl {
builder = builder.time_to_idle(ttl);
}
let memory_index = builder.build();
FileCache {
local_store,
memory_index,
Expand Down Expand Up @@ -376,12 +383,52 @@ mod tests {
ObjectStore::new(builder).unwrap().finish()
}

#[tokio::test]
async fn test_file_cache_ttl() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());

let cache = FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
Some(Duration::from_millis(5)),
);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
let file_path = cache.cache_file_path(key);

// Get an empty file.
assert!(cache.reader(key).await.is_none());

// Write a file.
local_store
.write(&file_path, b"hello".as_slice())
.await
.unwrap();

// Add to the cache.
cache
.put(
IndexKey::new(region_id, file_id, FileType::Parquet),
IndexValue { file_size: 5 },
)
.await;

let exist = cache.reader(key).await;
assert!(exist.is_some());
tokio::time::sleep(Duration::from_millis(10)).await;
cache.memory_index.run_pending_tasks().await;
let non = cache.reader(key).await;
assert!(non.is_none());
}

#[tokio::test]
async fn test_file_cache_basic() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());

let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
Expand Down Expand Up @@ -430,7 +477,7 @@ mod tests {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());

let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
Expand Down Expand Up @@ -462,7 +509,7 @@ mod tests {
async fn test_file_cache_recover() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);

let region_id = RegionId::new(2000, 0);
let file_type = FileType::Parquet;
Expand All @@ -488,7 +535,7 @@ mod tests {
}

// Recover the cache.
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
// No entry before recovery.
assert!(cache
.reader(IndexKey::new(region_id, file_ids[0], file_type))
Expand All @@ -513,7 +560,7 @@ mod tests {
async fn test_file_cache_read_ranges() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! A write-through cache for remote object stores.

use std::sync::Arc;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
Expand Down Expand Up @@ -55,9 +56,10 @@ impl WriteCache {
local_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
intermediate_manager: IntermediateManager,
) -> Result<Self> {
let file_cache = FileCache::new(local_store, cache_capacity);
let file_cache = FileCache::new(local_store, cache_capacity, ttl);
file_cache.recover().await?;

Ok(Self {
Expand All @@ -72,6 +74,7 @@ impl WriteCache {
cache_dir: &str,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
intermediate_manager: IntermediateManager,
) -> Result<Self> {
info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
Expand All @@ -81,6 +84,7 @@ impl WriteCache {
local_store,
object_store_manager,
cache_capacity,
ttl,
intermediate_manager,
)
.await
Expand Down
12 changes: 11 additions & 1 deletion src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ pub struct MitoConfig {
pub experimental_write_cache_path: String,
/// Capacity for write cache.
pub experimental_write_cache_size: ReadableSize,
/// TTL for write cache.
#[serde(with = "humantime_serde")]
pub experimental_write_cache_ttl: Option<Duration>,

// Other configs:
/// Buffer size for SST writing.
Expand Down Expand Up @@ -126,6 +129,7 @@ impl Default for MitoConfig {
enable_experimental_write_cache: false,
experimental_write_cache_path: String::new(),
experimental_write_cache_size: ReadableSize::mb(512),
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60)),
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
Expand Down Expand Up @@ -228,10 +232,16 @@ impl MitoConfig {

/// Enable experimental write cache.
#[cfg(test)]
pub fn enable_write_cache(mut self, path: String, size: ReadableSize) -> Self {
pub fn enable_write_cache(
mut self,
path: String,
size: ReadableSize,
ttl: Option<Duration>,
) -> Self {
self.enable_experimental_write_cache = true;
self.experimental_write_cache_path = path;
self.experimental_write_cache_size = size;
self.experimental_write_cache_ttl = ttl;
self
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ async fn test_engine_with_write_cache() {

let mut env = TestEnv::new();
let path = env.data_home().to_str().unwrap().to_string();
let mito_config = MitoConfig::default().enable_write_cache(path, ReadableSize::mb(512));
let mito_config = MitoConfig::default().enable_write_cache(path, ReadableSize::mb(512), None);
let engine = env.create_engine(mito_config).await;

let region_id = RegionId::new(1, 1);
Expand Down
7 changes: 4 additions & 3 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,10 @@ impl TestEnv {
.unwrap();

let object_store_manager = self.get_object_store_manager().unwrap();
let write_cache = WriteCache::new(local_store, object_store_manager, capacity, intm_mgr)
.await
.unwrap();
let write_cache =
WriteCache::new(local_store, object_store_manager, capacity, None, intm_mgr)
.await
.unwrap();

Arc::new(write_cache)
}
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ async fn write_cache_from_config(
&config.experimental_write_cache_path,
object_store_manager,
config.experimental_write_cache_size,
config.experimental_write_cache_ttl,
intermediate_manager,
)
.await?;
Expand Down
1 change: 1 addition & 0 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,7 @@ auto_flush_interval = "30m"
enable_experimental_write_cache = false
experimental_write_cache_path = ""
experimental_write_cache_size = "512MiB"
experimental_write_cache_ttl = "1h"
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
allow_stale_entries = false
Expand Down