Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): implements row group level page cache #2688

Merged
merged 15 commits into from
Nov 20, 2023
3 changes: 2 additions & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ global_write_buffer_reject_size = "2GB"
sst_meta_cache_size = "128MB"
# Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache.
vector_cache_size = "512MB"
# Cache size for pages of SST row groups (default 512MB). Setting it to 0 to disable the cache.
page_cache_size = "512MB"
# Buffer size for SST writing.
sst_write_buffer_size = "8MB"


# Log options, see `standalone.example.toml`
# [logging]
# dir = "/tmp/greptimedb/logs"
Expand Down
30 changes: 30 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,36 @@ auto_flush_interval = "1h"
# Global write buffer size for all regions.
global_write_buffer_size = "1GB"

# Mito engine options
[[region_engine]]
[region_engine.mito]
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
# Number of region workers
num_workers = 8
# Request channel size of each worker
worker_channel_size = 128
# Max batch size for a worker to handle requests
worker_request_batch_size = 64
# Number of meta action updated to trigger a new checkpoint for the manifest
manifest_checkpoint_distance = 10
# Manifest compression type
manifest_compress_type = "Uncompressed"
evenyag marked this conversation as resolved.
Show resolved Hide resolved
# Max number of running background jobs
max_background_jobs = 4
# Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
# Global write buffer size for all regions.
global_write_buffer_size = "1GB"
# Global write buffer size threshold to reject write requests (default 2G).
global_write_buffer_reject_size = "2GB"
# Cache size for SST metadata (default 128MB). Setting it to 0 to disable the cache.
sst_meta_cache_size = "128MB"
# Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache.
vector_cache_size = "512MB"
# Cache size for pages of SST row groups (default 512MB). Setting it to 0 to disable the cache.
page_cache_size = "512MB"
# Buffer size for SST writing.
sst_write_buffer_size = "8MB"

# Log options
# [logging]
# Specify logs directory.
Expand Down
192 changes: 174 additions & 18 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,51 @@ use std::sync::Arc;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use moka::sync::Cache;
use parquet::column::page::Page;
use parquet::file::metadata::ParquetMetaData;
use store_api::storage::RegionId;

use crate::cache::cache_size::parquet_meta_size;
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
use crate::sst::file::FileId;

// Metrics type key for sst meta.
const SST_META_TYPE: &str = "sst_meta";
// Metrics type key for vector.
const VECTOR_TYPE: &str = "vector";
// Metrics type key for pages.
const PAGE_TYPE: &str = "page";

/// Manages cached data for the engine.
pub struct CacheManager {
/// Cache for SST metadata.
sst_meta_cache: Option<SstMetaCache>,
/// Cache for vectors.
vector_cache: Option<VectorCache>,
/// Cache for SST pages.
page_cache: Option<PageCache>,
}

pub type CacheManagerRef = Arc<CacheManager>;

impl CacheManager {
/// Creates a new manager with specific cache size in bytes.
pub fn new(sst_meta_cache_size: u64, vector_cache_size: u64) -> CacheManager {
pub fn new(
sst_meta_cache_size: u64,
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
vector_cache_size: u64,
page_cache_size: u64,
) -> CacheManager {
let sst_meta_cache = if sst_meta_cache_size == 0 {
None
} else {
let cache = Cache::builder()
.max_capacity(sst_meta_cache_size)
.weigher(|k: &SstMetaKey, v: &Arc<ParquetMetaData>| {
// We ignore the size of `Arc`.
(k.estimated_size() + parquet_meta_size(v)) as u32
.weigher(meta_cache_weight)
.eviction_listener(|k, v, _cause| {
let size = meta_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[SST_META_TYPE])
.sub(size.into());
})
.build();
Some(cache)
Expand All @@ -60,9 +78,25 @@ impl CacheManager {
} else {
let cache = Cache::builder()
.max_capacity(vector_cache_size)
.weigher(|_k, v: &VectorRef| {
// We ignore the heap size of `Value`.
(mem::size_of::<Value>() + v.memory_size()) as u32
.weigher(vector_cache_weight)
.eviction_listener(|k, v, _cause| {
let size = vector_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[VECTOR_TYPE])
.sub(size.into());
})
.build();
Some(cache)
};
let page_cache = if page_cache_size == 0 {
None
} else {
let cache = Cache::builder()
.max_capacity(page_cache_size)
.weigher(page_cache_weight)
.eviction_listener(|k, v, _cause| {
let size = page_cache_weight(&k, &v);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
})
.build();
Some(cache)
Expand All @@ -71,6 +105,7 @@ impl CacheManager {
CacheManager {
sst_meta_cache,
vector_cache,
page_cache,
}
}

Expand All @@ -80,9 +115,10 @@ impl CacheManager {
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
self.sst_meta_cache
.as_ref()
.and_then(|sst_meta_cache| sst_meta_cache.get(&SstMetaKey(region_id, file_id)))
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id));
update_hit_miss(value, SST_META_TYPE)
})
}

/// Puts [ParquetMetaData] into the cache.
Expand All @@ -93,7 +129,11 @@ impl CacheManager {
metadata: Arc<ParquetMetaData>,
) {
if let Some(cache) = &self.sst_meta_cache {
cache.insert(SstMetaKey(region_id, file_id), metadata);
let key = SstMetaKey(region_id, file_id);
CACHE_BYTES
.with_label_values(&[SST_META_TYPE])
.add(meta_cache_weight(&key, &metadata).into());
cache.insert(key, metadata);
}
}

Expand All @@ -106,17 +146,63 @@ impl CacheManager {

/// Gets a vector with repeated value for specific `key`.
pub fn get_repeated_vector(&self, key: &Value) -> Option<VectorRef> {
self.vector_cache
.as_ref()
.and_then(|vector_cache| vector_cache.get(key))
self.vector_cache.as_ref().and_then(|vector_cache| {
let value = vector_cache.get(key);
update_hit_miss(value, VECTOR_TYPE)
})
}

/// Puts a vector with repeated value into the cache.
pub fn put_repeated_vector(&self, key: Value, vector: VectorRef) {
if let Some(cache) = &self.vector_cache {
CACHE_BYTES
.with_label_values(&[VECTOR_TYPE])
.add(vector_cache_weight(&key, &vector).into());
cache.insert(key, vector);
}
}

/// Gets pages for the row group.
pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
self.page_cache.as_ref().and_then(|page_cache| {
let value = page_cache.get(page_key);
update_hit_miss(value, PAGE_TYPE)
})
}

/// Puts pages of the row group into the cache.
pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
if let Some(cache) = &self.page_cache {
CACHE_BYTES
.with_label_values(&[PAGE_TYPE])
.add(page_cache_weight(&page_key, &pages).into());
cache.insert(page_key, pages);
}
}
}

fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
// We ignore the size of `Arc`.
(k.estimated_size() + parquet_meta_size(v)) as u32
}

fn vector_cache_weight(_k: &Value, v: &VectorRef) -> u32 {
// We ignore the heap size of `Value`.
(mem::size_of::<Value>() + v.memory_size()) as u32
}

fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
(k.estimated_size() + v.estimated_size()) as u32
}

/// Updates cache hit/miss metrics.
fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
if value.is_some() {
CACHE_HIT.with_label_values(&[cache_type]).inc();
} else {
CACHE_MISS.with_label_values(&[cache_type]).inc();
}
value
}

/// Cache key (region id, file id) for SST meta.
Expand All @@ -126,7 +212,46 @@ struct SstMetaKey(RegionId, FileId);
impl SstMetaKey {
/// Returns memory used by the key (estimated).
fn estimated_size(&self) -> usize {
mem::size_of::<SstMetaKey>()
mem::size_of::<Self>()
}
}

/// Cache key for pages of a SST row group.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PageKey {
/// Region id of the SST file to cache.
pub region_id: RegionId,
/// Id of the SST file to cache.
pub file_id: FileId,
/// Index of the row group.
pub row_group_idx: usize,
/// Index of the column in the row group.
pub column_idx: usize,
}

impl PageKey {
/// Returns memory used by the key (estimated).
fn estimated_size(&self) -> usize {
mem::size_of::<Self>()
}
}

/// Cached row group pages for a column.
pub struct PageValue {
/// All pages of the column in the row group.
pub pages: Vec<Page>,
}

impl PageValue {
/// Creates a new page value.
pub fn new(pages: Vec<Page>) -> PageValue {
PageValue { pages }
}

/// Returns memory used by the value (estimated).
fn estimated_size(&self) -> usize {
// We only consider heap size of all pages.
self.pages.iter().map(|page| page.buffer().len()).sum()
}
}

Expand All @@ -136,6 +261,8 @@ type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
///
/// e.g. `"hello" => ["hello", "hello", "hello"]`
type VectorCache = Cache<Value, VectorRef>;
/// Maps (region, file, row group, column) to [PageValue].
type PageCache = Cache<PageKey, Arc<PageValue>>;

#[cfg(test)]
mod tests {
Expand All @@ -146,8 +273,10 @@ mod tests {

#[test]
fn test_disable_cache() {
let cache = CacheManager::new(0, 0);
let cache = CacheManager::new(0, 0, 0);
assert!(cache.sst_meta_cache.is_none());
assert!(cache.vector_cache.is_none());
assert!(cache.page_cache.is_none());

let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
Expand All @@ -159,11 +288,21 @@ mod tests {
let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
cache.put_repeated_vector(value.clone(), vector.clone());
assert!(cache.get_repeated_vector(&value).is_none());

let key = PageKey {
region_id,
file_id,
row_group_idx: 0,
column_idx: 0,
};
let pages = Arc::new(PageValue::new(Vec::new()));
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_none());
}

#[test]
fn test_parquet_meta_cache() {
let cache = CacheManager::new(2000, 0);
let cache = CacheManager::new(2000, 0, 0);
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
Expand All @@ -176,12 +315,29 @@ mod tests {

#[test]
fn test_repeated_vector_cache() {
let cache = CacheManager::new(0, 4096);
let cache = CacheManager::new(0, 4096, 0);
let value = Value::Int64(10);
assert!(cache.get_repeated_vector(&value).is_none());
let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
cache.put_repeated_vector(value.clone(), vector.clone());
let cached = cache.get_repeated_vector(&value).unwrap();
assert_eq!(vector, cached);
}

#[test]
fn test_page_cache() {
let cache = CacheManager::new(0, 0, 1000);
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
let key = PageKey {
region_id,
file_id,
row_group_idx: 0,
column_idx: 0,
};
assert!(cache.get_pages(&key).is_none());
let pages = Arc::new(PageValue::new(Vec::new()));
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_some());
}
}
5 changes: 5 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub struct MitoConfig {
pub sst_meta_cache_size: ReadableSize,
/// Cache size for vectors and arrow arrays (default 512MB). Setting it to 0 to disable the cache.
pub vector_cache_size: ReadableSize,
/// Cache size for pages of SST row groups (default 512MB). Setting it to 0 to disable the cache.
pub page_cache_size: ReadableSize,

// Other configs:
/// Buffer size for SST writing.
pub sst_write_buffer_size: ReadableSize,
}
Expand All @@ -81,6 +85,7 @@ impl Default for MitoConfig {
global_write_buffer_reject_size: ReadableSize::gb(2),
sst_meta_cache_size: ReadableSize::mb(128),
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
sst_write_buffer_size: ReadableSize::mb(8),
}
}
Expand Down
Loading
Loading