Skip to content

Commit

Permalink
feat: cache metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Nov 16, 2023
1 parent 67cb823 commit 30610d0
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 21 deletions.
89 changes: 71 additions & 18 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,16 @@ use parquet::file::metadata::ParquetMetaData;
use store_api::storage::RegionId;

use crate::cache::cache_size::parquet_meta_size;
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
use crate::sst::file::FileId;

// Metrics type key for sst meta.
const SST_META_TYPE: &str = "sst_meta";
// Metrics type key for vector.
const VECTOR_TYPE: &str = "vector";
// Metrics type key for pages.
const PAGE_TYPE: &str = "page";

/// Manages cached data for the engine.
pub struct CacheManager {
/// Cache for SST metadata.
Expand All @@ -55,9 +63,12 @@ impl CacheManager {
} else {
let cache = Cache::builder()
.max_capacity(sst_meta_cache_size)
.weigher(|k: &SstMetaKey, v: &Arc<ParquetMetaData>| {
// We ignore the size of `Arc`.
(k.estimated_size() + parquet_meta_size(v)) as u32
.weigher(meta_cache_weight)
.eviction_listener(|k, v, _cause| {
let size = meta_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[SST_META_TYPE])
.sub(size.into());
})
.build();
Some(cache)
Expand All @@ -67,9 +78,12 @@ impl CacheManager {
} else {
let cache = Cache::builder()
.max_capacity(vector_cache_size)
.weigher(|_k, v: &VectorRef| {
// We ignore the heap size of `Value`.
(mem::size_of::<Value>() + v.memory_size()) as u32
.weigher(vector_cache_weight)
.eviction_listener(|k, v, _cause| {
let size = vector_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[VECTOR_TYPE])
.sub(size.into());
})
.build();
Some(cache)
Expand All @@ -79,8 +93,10 @@ impl CacheManager {
} else {
let cache = Cache::builder()
.max_capacity(page_cache_size)
.weigher(|k: &PageKey, v: &Arc<PageValue>| {
(k.estimated_size() + v.estimated_size()) as u32
.weigher(page_cache_weight)
.eviction_listener(|k, v, _cause| {
let size = page_cache_weight(&k, &v);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
})
.build();
Some(cache)
Expand All @@ -99,9 +115,10 @@ impl CacheManager {
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
self.sst_meta_cache
.as_ref()
.and_then(|sst_meta_cache| sst_meta_cache.get(&SstMetaKey(region_id, file_id)))
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id));
update_hit_miss(value, SST_META_TYPE)
})
}

/// Puts [ParquetMetaData] into the cache.
Expand All @@ -112,7 +129,11 @@ impl CacheManager {
metadata: Arc<ParquetMetaData>,
) {
if let Some(cache) = &self.sst_meta_cache {
cache.insert(SstMetaKey(region_id, file_id), metadata);
let key = SstMetaKey(region_id, file_id);
CACHE_BYTES
.with_label_values(&[SST_META_TYPE])
.add(meta_cache_weight(&key, &metadata).into());
cache.insert(key, metadata);
}
}

Expand All @@ -125,33 +146,65 @@ impl CacheManager {

/// Gets a vector with repeated value for specific `key`.
pub fn get_repeated_vector(&self, key: &Value) -> Option<VectorRef> {
self.vector_cache
.as_ref()
.and_then(|vector_cache| vector_cache.get(key))
self.vector_cache.as_ref().and_then(|vector_cache| {
let value = vector_cache.get(key);
update_hit_miss(value, VECTOR_TYPE)
})
}

/// Puts a vector with repeated value into the cache.
pub fn put_repeated_vector(&self, key: Value, vector: VectorRef) {
if let Some(cache) = &self.vector_cache {
CACHE_BYTES
.with_label_values(&[VECTOR_TYPE])
.add(vector_cache_weight(&key, &vector).into());
cache.insert(key, vector);
}
}

/// Gets pages for the row group.
pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
self.page_cache
.as_ref()
.and_then(|page_cache| page_cache.get(page_key))
self.page_cache.as_ref().and_then(|page_cache| {
let value = page_cache.get(page_key);
update_hit_miss(value, PAGE_TYPE)
})
}

/// Puts pages of the row group into the cache.
pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
if let Some(cache) = &self.page_cache {
CACHE_BYTES
.with_label_values(&[PAGE_TYPE])
.add(page_cache_weight(&page_key, &pages).into());
cache.insert(page_key, pages);
}
}
}

fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
// We ignore the size of `Arc`.
(k.estimated_size() + parquet_meta_size(v)) as u32
}

fn vector_cache_weight(_k: &Value, v: &VectorRef) -> u32 {
// We ignore the heap size of `Value`.
(mem::size_of::<Value>() + v.memory_size()) as u32
}

fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
(k.estimated_size() + v.estimated_size()) as u32
}

/// Updates cache hit/miss metrics.
fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
if value.is_some() {
CACHE_HIT.with_label_values(&[cache_type]).inc();
} else {
CACHE_MISS.with_label_values(&[cache_type]).inc();
}
value
}

/// Cache key (region id, file id) for SST meta.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct SstMetaKey(RegionId, FileId);
Expand Down
25 changes: 23 additions & 2 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ lazy_static! {
)
.unwrap();



// ------ Flush related metrics
/// Counter of scheduled flush requests.
/// Note that the flush scheduler may merge some flush requests.
Expand Down Expand Up @@ -122,4 +120,27 @@ lazy_static! {
pub static ref MERGE_FILTER_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("mito_merge_filter_rows_total", "mito merge filter rows total", &[TYPE_LABEL]).unwrap();
// ------- End of query metrics.

// Cache related metrics.
/// Cache hit counter.
pub static ref CACHE_HIT: IntCounterVec = register_int_counter_vec!(
"mito_cache_hit",
"mito cache hit",
&[TYPE_LABEL]
)
.unwrap();
/// Cache miss counter.
pub static ref CACHE_MISS: IntCounterVec = register_int_counter_vec!(
"mito_cache_miss",
"mito cache miss",
&[TYPE_LABEL]
)
.unwrap();
/// Cache size in bytes.
pub static ref CACHE_BYTES: IntGaugeVec = register_int_gauge_vec!(
"mito_cache_bytes",
"mito cache bytes",
&[TYPE_LABEL]
)
.unwrap();
}
2 changes: 1 addition & 1 deletion src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ pub struct ParquetReader {
read_format: ReadFormat,
/// Builder to build row group readers.
///
/// The builder contains the file handle so don't drop the builder while using
/// The builder contains the file handle, so don't drop the builder while using
/// the [ParquetReader].
reader_builder: RowGroupReaderBuilder,
/// Reader of current row group.
Expand Down

0 comments on commit 30610d0

Please sign in to comment.