Skip to content

Commit

Permalink
feat(mito): Add cache manager (GreptimeTeam#2488)
Browse files Browse the repository at this point in the history
* feat: add cache manager

* feat: add cache to reader builder

* feat: add AsyncFileReaderCache

* feat: Impl AsyncFileReaderCache

* chore: move moka dep to workspace

* feat: add moka cache to the manager

* feat: implement parquet meta cache

* test: test cache manager

* feat: consider vec size

* style: fix clippy

* test: fix config api test

* feat: divide cache

* test: test disabling meta cache

* test: fix config api test

* feat: remove meta cache if file is purged
  • Loading branch information
evenyag authored and paomian committed Oct 19, 2023
1 parent 99d2e5e commit 3e441b2
Show file tree
Hide file tree
Showing 23 changed files with 516 additions and 52 deletions.
35 changes: 5 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
moka = { version = "0.11" }
once_cell = "1.18"
opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] }
parquet = "43.0"
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ futures-util.workspace = true
lazy_static.workspace = true
meta-client = { workspace = true }
metrics.workspace = true
moka = { version = "0.11", features = ["future"] }
moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
partition.workspace = true
regex.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ datatypes = { workspace = true }
derive_builder.workspace = true
enum_dispatch = "0.3"
futures-util.workspace = true
moka = { version = "0.9", features = ["future"] }
moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
prost.workspace = true
rand.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ meta-client = { workspace = true }
raft-engine = { workspace = true }
# Although it is not used, please do not delete it.
metrics.workspace = true
moka = { version = "0.9", features = ["future"] }
moka = { workspace = true, features = ["future"] }
object-store = { workspace = true }
openmetrics-parser = "0.4"
opentelemetry-proto.workspace = true
Expand Down
1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ humantime-serde = { workspace = true }
lazy_static = "1.4"
memcomparable = "0.2"
metrics.workspace = true
moka.workspace = true
object-store = { workspace = true }
parquet = { workspace = true, features = ["async"] }
paste.workspace = true
Expand Down
131 changes: 131 additions & 0 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Cache for the engine.
mod cache_size;
#[cfg(test)]
pub(crate) mod test_util;

use std::mem;
use std::sync::Arc;

use moka::sync::Cache;
use parquet::file::metadata::ParquetMetaData;
use store_api::storage::RegionId;

use crate::cache::cache_size::parquet_meta_size;
use crate::sst::file::FileId;

/// Manages cached data for the engine.
pub struct CacheManager {
/// Cache for SST metadata.
sst_meta_cache: Option<SstMetaCache>,
}

pub type CacheManagerRef = Arc<CacheManager>;

impl CacheManager {
/// Creates a new manager with specific cache size in bytes.
pub fn new(sst_meta_cache_size: u64) -> CacheManager {
let sst_meta_cache = if sst_meta_cache_size == 0 {
None
} else {
let cache = Cache::builder()
.max_capacity(sst_meta_cache_size)
.weigher(|k: &SstMetaKey, v: &Arc<ParquetMetaData>| {
// We ignore the size of `Arc`.
(k.estimated_size() + parquet_meta_size(v)) as u32
})
.build();
Some(cache)
};

CacheManager { sst_meta_cache }
}

/// Gets cached [ParquetMetaData].
pub fn get_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
self.sst_meta_cache
.as_ref()
.and_then(|sst_meta_cache| sst_meta_cache.get(&SstMetaKey(region_id, file_id)))
}

/// Puts [ParquetMetaData] into the cache.
pub fn put_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
metadata: Arc<ParquetMetaData>,
) {
if let Some(cache) = &self.sst_meta_cache {
cache.insert(SstMetaKey(region_id, file_id), metadata);
}
}

/// Removes [ParquetMetaData] from the cache.
pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) {
if let Some(cache) = &self.sst_meta_cache {
cache.remove(&SstMetaKey(region_id, file_id));
}
}
}

/// Cache key for SST meta.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct SstMetaKey(RegionId, FileId);

impl SstMetaKey {
/// Returns memory used by the key (estimated).
fn estimated_size(&self) -> usize {
mem::size_of::<SstMetaKey>()
}
}

type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;

#[cfg(test)]
mod tests {
use super::*;
use crate::cache::test_util::parquet_meta;

#[test]
fn test_disable_meta_cache() {
let cache = CacheManager::new(0);
assert!(cache.sst_meta_cache.is_none());

let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
let metadata = parquet_meta();
cache.put_parquet_meta_data(region_id, file_id, metadata);
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
}

#[test]
fn test_parquet_meta_cache() {
let cache = CacheManager::new(2000);
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
let metadata = parquet_meta();
cache.put_parquet_meta_data(region_id, file_id, metadata);
assert!(cache.get_parquet_meta_data(region_id, file_id).is_some());
cache.remove_parquet_meta_data(region_id, file_id);
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
}
}
142 changes: 142 additions & 0 deletions src/mito2/src/cache/cache_size.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Cache size of different cache value.
use std::mem;

use parquet::file::metadata::{
FileMetaData, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex, RowGroupMetaData,
};
use parquet::file::page_index::index::Index;
use parquet::format::{ColumnOrder, KeyValue, PageLocation};
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};

/// Returns estimated size of [ParquetMetaData].
pub fn parquet_meta_size(meta: &ParquetMetaData) -> usize {
// struct size
let mut size = mem::size_of::<ParquetMetaData>();
// file_metadata
size += file_meta_heap_size(meta.file_metadata());
// row_groups
size += meta
.row_groups()
.iter()
.map(row_group_meta_heap_size)
.sum::<usize>();
// column_index
size += meta
.column_index()
.map(parquet_column_index_heap_size)
.unwrap_or(0);
// offset_index
size += meta
.offset_index()
.map(parquet_offset_index_heap_size)
.unwrap_or(0);

size
}

/// Returns estimated size of [FileMetaData] allocated from heap.
fn file_meta_heap_size(meta: &FileMetaData) -> usize {
// created_by
let mut size = meta.created_by().map(|s| s.len()).unwrap_or(0);
// key_value_metadata
size += meta
.key_value_metadata()
.map(|kvs| {
kvs.iter()
.map(|kv| {
kv.key.len()
+ kv.value.as_ref().map(|v| v.len()).unwrap_or(0)
+ mem::size_of::<KeyValue>()
})
.sum()
})
.unwrap_or(0);
// schema_descr (It's a ptr so we also add size of SchemaDescriptor).
size += mem::size_of::<SchemaDescriptor>();
size += schema_descr_heap_size(meta.schema_descr());
// column_orders
size += meta
.column_orders()
.map(|orders| orders.len() * mem::size_of::<ColumnOrder>())
.unwrap_or(0);

size
}

/// Returns estimated size of [SchemaDescriptor] allocated from heap.
fn schema_descr_heap_size(descr: &SchemaDescriptor) -> usize {
// schema
let mut size = mem::size_of::<Type>();
// leaves
size += descr
.columns()
.iter()
.map(|descr| mem::size_of::<ColumnDescriptor>() + column_descr_heap_size(descr))
.sum::<usize>();
// leaf_to_base
size += descr.num_columns() * mem::size_of::<usize>();

size
}

/// Returns estimated size of [ColumnDescriptor] allocated from heap.
fn column_descr_heap_size(descr: &ColumnDescriptor) -> usize {
descr.path().parts().iter().map(|s| s.len()).sum()
}

/// Returns estimated size of [ColumnDescriptor] allocated from heap.
fn row_group_meta_heap_size(meta: &RowGroupMetaData) -> usize {
mem::size_of_val(meta.columns())
}

/// Returns estimated size of [ParquetColumnIndex] allocated from heap.
fn parquet_column_index_heap_size(column_index: &ParquetColumnIndex) -> usize {
column_index
.iter()
.map(|row_group| row_group.len() * mem::size_of::<Index>() + mem::size_of_val(row_group))
.sum()
}

/// Returns estimated size of [ParquetOffsetIndex] allocated from heap.
fn parquet_offset_index_heap_size(offset_index: &ParquetOffsetIndex) -> usize {
offset_index
.iter()
.map(|row_group| {
row_group
.iter()
.map(|column| {
column.len() * mem::size_of::<PageLocation>() + mem::size_of_val(column)
})
.sum::<usize>()
+ mem::size_of_val(row_group)
})
.sum()
}

#[cfg(test)]
mod tests {
use super::*;
use crate::cache::test_util::parquet_meta;

#[test]
fn test_parquet_meta_size() {
let metadata = parquet_meta();

assert_eq!(948, parquet_meta_size(&metadata));
}
}
Loading

0 comments on commit 3e441b2

Please sign in to comment.