diff --git a/Cargo.lock b/Cargo.lock index 098968da0dfb..580db0079e78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -738,9 +738,12 @@ dependencies = [ [[package]] name = "atomic" -version = "0.5.3" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" +checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994" +dependencies = [ + "bytemuck", +] [[package]] name = "atomic-waker" @@ -4993,6 +4996,7 @@ dependencies = [ "tempfile", "tokio", "tokio-util", + "uuid", ] [[package]] @@ -12896,9 +12900,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.8.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +checksum = "5de17fd2f7da591098415cff336e12965a28061ddace43b59cb3c430179c9439" dependencies = [ "atomic", "getrandom", @@ -12909,9 +12913,9 @@ dependencies = [ [[package]] name = "uuid-macro-internal" -version = "1.8.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2" +checksum = "a3ff64d5cde1e2cb5268bdb497235b6bd255ba8244f910dbc3574e59593de68c" dependencies = [ "proc-macro2", "quote", diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index c1e76e4488a2..8e2c05c6ad05 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -29,6 +29,7 @@ snafu.workspace = true tantivy = { version = "0.22", features = ["zstd-compression"] } tantivy-jieba = "0.11.0" tokio.workspace = true +uuid.workspace = true [dev-dependencies] common-test-util.workspace = true diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index 683a56561663..a6fb0cecbfcd 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -12,27 +12,41 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod blob; -mod footer; +use std::sync::Arc; use async_trait::async_trait; use common_base::BitVec; use greptime_proto::v1::index::InvertedIndexMetas; +use snafu::ResultExt; -use crate::inverted_index::error::Result; +use crate::inverted_index::error::{DecodeFstSnafu, Result}; pub use crate::inverted_index::format::reader::blob::InvertedIndexBlobReader; use crate::inverted_index::FstMap; +mod blob; +mod footer; + /// InvertedIndexReader defines an asynchronous reader of inverted index data #[mockall::automock] #[async_trait] pub trait InvertedIndexReader: Send { - /// Retrieve metadata of all inverted indices stored within the blob. - async fn metadata(&mut self) -> Result; + /// Reads all data to dest. + async fn read_all(&mut self, dest: &mut Vec) -> Result; + + /// Seeks to given offset and reads data with exact size as provided. + async fn seek_read(&mut self, offset: u64, size: u32) -> Result>; + + /// Retrieves metadata of all inverted indices stored within the blob. + async fn metadata(&mut self) -> Result>; - /// Retrieve the finite state transducer (FST) map from the given offset and size. - async fn fst(&mut self, offset: u64, size: u32) -> Result; + /// Retrieves the finite state transducer (FST) map from the given offset and size. + async fn fst(&mut self, offset: u64, size: u32) -> Result { + let fst_data = self.seek_read(offset, size).await?; + FstMap::new(fst_data).context(DecodeFstSnafu) + } - /// Retrieve the bitmap from the given offset and size. - async fn bitmap(&mut self, offset: u64, size: u32) -> Result; + /// Retrieves the bitmap from the given offset and size. + async fn bitmap(&mut self, offset: u64, size: u32) -> Result { + self.seek_read(offset, size).await.map(BitVec::from_vec) + } } diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index 99f2f93239a3..3a6274f5f90b 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -13,18 +13,16 @@ // limitations under the License. use std::io::SeekFrom; +use std::sync::Arc; use async_trait::async_trait; -use common_base::BitVec; use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use greptime_proto::v1::index::InvertedIndexMetas; use snafu::{ensure, ResultExt}; -use crate::inverted_index::error::{ - DecodeFstSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedBlobSizeSnafu, -}; +use crate::inverted_index::error::{ReadSnafu, Result, SeekSnafu, UnexpectedBlobSizeSnafu}; use crate::inverted_index::format::reader::footer::InvertedIndeFooterReader; -use crate::inverted_index::format::reader::{FstMap, InvertedIndexReader}; +use crate::inverted_index::format::reader::InvertedIndexReader; use crate::inverted_index::format::MIN_BLOB_SIZE; /// Inverted index blob reader, implements [`InvertedIndexReader`] @@ -52,35 +50,31 @@ impl InvertedIndexBlobReader { #[async_trait] impl InvertedIndexReader for InvertedIndexBlobReader { - async fn metadata(&mut self) -> Result { - let end = SeekFrom::End(0); - let blob_size = self.source.seek(end).await.context(SeekSnafu)?; - Self::validate_blob_size(blob_size)?; - - let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size); - footer_reader.metadata().await - } - - async fn fst(&mut self, offset: u64, size: u32) -> Result { + async fn read_all(&mut self, dest: &mut Vec) -> Result { self.source - .seek(SeekFrom::Start(offset)) + .seek(SeekFrom::Start(0)) .await .context(SeekSnafu)?; - let mut buf = vec![0u8; size as usize]; - self.source.read_exact(&mut buf).await.context(ReadSnafu)?; - - FstMap::new(buf).context(DecodeFstSnafu) + self.source.read_to_end(dest).await.context(ReadSnafu) } - async fn bitmap(&mut self, offset: u64, size: u32) -> Result { + async fn seek_read(&mut self, offset: u64, size: u32) -> Result> { self.source .seek(SeekFrom::Start(offset)) .await .context(SeekSnafu)?; let mut buf = vec![0u8; size as usize]; - self.source.read_exact(&mut buf).await.context(ReadSnafu)?; + self.source.read(&mut buf).await.context(ReadSnafu)?; + Ok(buf) + } + + async fn metadata(&mut self) -> Result> { + let end = SeekFrom::End(0); + let blob_size = self.source.seek(end).await.context(SeekSnafu)?; + Self::validate_blob_size(blob_size)?; - Ok(BitVec::from_vec(buf)) + let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size); + footer_reader.metadata().await.map(Arc::new) } } diff --git a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs index 85928d9183c2..3b21e21dc194 100644 --- a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs +++ b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs @@ -148,6 +148,8 @@ impl TryFrom)>> for PredicatesIndexApplier { #[cfg(test)] mod tests { + use std::sync::Arc; + use common_base::bit_vec::prelude::*; use greptime_proto::v1::index::InvertedIndexMeta; @@ -161,7 +163,7 @@ mod tests { s.to_owned() } - fn mock_metas(tags: impl IntoIterator) -> InvertedIndexMetas { + fn mock_metas(tags: impl IntoIterator) -> Arc { let mut metas = InvertedIndexMetas { total_row_count: 8, segment_row_count: 1, @@ -175,7 +177,7 @@ mod tests { }; metas.metas.insert(s(tag), meta); } - metas + Arc::new(metas) } fn key_fst_applier(value: &'static str) -> Box { @@ -300,11 +302,11 @@ mod tests { async fn test_index_applier_with_empty_index() { let mut mock_reader = MockInvertedIndexReader::new(); mock_reader.expect_metadata().returning(move || { - Ok(InvertedIndexMetas { + Ok(Arc::new(InvertedIndexMetas { total_row_count: 0, // No rows segment_row_count: 1, ..Default::default() - }) + })) }); let mut mock_fst_applier = MockFstApplier::new(); diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 892ee4fb9274..c36bcdbb83fb 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -17,6 +17,7 @@ mod cache_size; pub(crate) mod file_cache; +pub(crate) mod index; #[cfg(test)] pub(crate) mod test_util; pub(crate) mod write_cache; @@ -33,6 +34,7 @@ use store_api::storage::{ConcreteDataType, RegionId}; use crate::cache::cache_size::parquet_meta_size; use crate::cache::file_cache::{FileType, IndexKey}; +use crate::cache::index::{InvertedIndexCache, InvertedIndexCacheRef}; use crate::cache::write_cache::WriteCacheRef; use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS}; use crate::sst::file::FileId; @@ -59,6 +61,8 @@ pub struct CacheManager { page_cache: Option, /// A Cache for writing files to object stores. write_cache: Option, + /// Cache for inverted index. + index_cache: Option, } pub type CacheManagerRef = Arc; @@ -167,6 +171,10 @@ impl CacheManager { pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> { self.write_cache.as_ref() } + + pub(crate) fn index_cache(&self) -> Option<&InvertedIndexCacheRef> { + self.index_cache.as_ref() + } } /// Builder to construct a [CacheManager]. @@ -175,6 +183,8 @@ pub struct CacheManagerBuilder { sst_meta_cache_size: u64, vector_cache_size: u64, page_cache_size: u64, + index_metadata_size: u64, + index_content_size: u64, write_cache: Option, } @@ -203,6 +213,18 @@ impl CacheManagerBuilder { self } + /// Sets cache size for index metadata. + pub fn index_metadata_size(mut self, bytes: u64) -> Self { + self.index_metadata_size = bytes; + self + } + + /// Sets cache size for index content. + pub fn index_content_size(mut self, bytes: u64) -> Self { + self.index_content_size = bytes; + self + } + /// Builds the [CacheManager]. pub fn build(self) -> CacheManager { let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| { @@ -240,11 +262,14 @@ impl CacheManagerBuilder { .build() }); + let inverted_index_cache = + InvertedIndexCache::new(self.index_metadata_size, self.index_content_size); CacheManager { sst_meta_cache, vector_cache, page_cache, write_cache: self.write_cache, + index_cache: Some(Arc::new(inverted_index_cache)), } } } diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs new file mode 100644 index 000000000000..4e6e4deee260 --- /dev/null +++ b/src/mito2/src/cache/index.rs @@ -0,0 +1,211 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::index::InvertedIndexMetas; +use async_trait::async_trait; +use common_base::BitVec; +use index::inverted_index::error::DecodeFstSnafu; +use index::inverted_index::format::reader::InvertedIndexReader; +use index::inverted_index::FstMap; +use prost::Message; +use snafu::ResultExt; + +use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS}; +use crate::sst::file::FileId; + +/// Metrics for index metadata. +const INDEX_METADATA_TYPE: &str = "index_metadata"; +/// Metrics for index content. +const INDEX_CONTENT_TYPE: &str = "index_content"; + +/// Inverted index blob reader with cache. +pub struct CachedInvertedIndexBlobReader { + file_id: FileId, + inner: R, + cache: InvertedIndexCacheRef, +} + +impl CachedInvertedIndexBlobReader { + pub fn new(file_id: FileId, inner: R, cache: InvertedIndexCacheRef) -> Self { + Self { + file_id, + inner, + cache, + } + } +} + +impl CachedInvertedIndexBlobReader +where + R: InvertedIndexReader, +{ + /// Gets given range of index data from cache, and loads from source if the file + /// is not already cached. + async fn get_or_load( + &mut self, + offset: u64, + size: u32, + ) -> index::inverted_index::error::Result> { + let range = offset as usize..(offset + size as u64) as usize; + if let Some(cached) = self.cache.get_index(IndexKey { + file_id: self.file_id, + }) { + CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); + Ok(cached[range].to_vec()) + } else { + let mut all_data = Vec::with_capacity(1024 * 1024); + self.inner.read_all(&mut all_data).await?; + let result = all_data[range].to_vec(); + self.cache.put_index( + IndexKey { + file_id: self.file_id, + }, + Arc::new(all_data), + ); + CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); + Ok(result) + } + } +} + +#[async_trait] +impl InvertedIndexReader for CachedInvertedIndexBlobReader { + async fn read_all( + &mut self, + dest: &mut Vec, + ) -> index::inverted_index::error::Result { + self.inner.read_all(dest).await + } + + async fn seek_read( + &mut self, + offset: u64, + size: u32, + ) -> index::inverted_index::error::Result> { + self.inner.seek_read(offset, size).await + } + + async fn metadata(&mut self) -> index::inverted_index::error::Result> { + if let Some(cached) = self.cache.get_index_metadata(self.file_id) { + CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); + Ok(cached) + } else { + let meta = self.inner.metadata().await?; + self.cache.put_index_metadata(self.file_id, meta.clone()); + CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); + Ok(meta) + } + } + + async fn fst( + &mut self, + offset: u64, + size: u32, + ) -> index::inverted_index::error::Result { + self.get_or_load(offset, size) + .await + .and_then(|r| FstMap::new(r).context(DecodeFstSnafu)) + } + + async fn bitmap( + &mut self, + offset: u64, + size: u32, + ) -> index::inverted_index::error::Result { + self.get_or_load(offset, size).await.map(BitVec::from_vec) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct IndexKey { + file_id: FileId, +} + +pub type InvertedIndexCacheRef = Arc; + +pub struct InvertedIndexCache { + /// Cache for inverted index metadata + index_metadata: moka::sync::Cache>, + /// Cache for inverted index content. + index: moka::sync::Cache>>, +} + +impl InvertedIndexCache { + /// Creates `InvertedIndexCache` with provided `index_metadata_cap` and `index_content_cap`. + pub fn new(index_metadata_cap: u64, index_content_cap: u64) -> Self { + common_telemetry::debug!("Building InvertedIndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}"); + let index_metadata = moka::sync::CacheBuilder::new(index_metadata_cap) + .name("inverted_index_metadata") + .weigher(index_metadata_weight) + .eviction_listener(|k, v, _cause| { + let size = index_metadata_weight(&k, &v); + CACHE_BYTES + .with_label_values(&[INDEX_METADATA_TYPE]) + .sub(size.into()); + }) + .build(); + let index_cache = moka::sync::CacheBuilder::new(index_content_cap) + .name("inverted_index_content") + .weigher(index_content_weight) + .eviction_listener(|k, v, _cause| { + let size = index_content_weight(&k, &v); + CACHE_BYTES + .with_label_values(&[INDEX_CONTENT_TYPE]) + .sub(size.into()); + }) + .build(); + Self { + index_metadata, + index: index_cache, + } + } +} + +impl InvertedIndexCache { + pub fn get_index_metadata(&self, file_id: FileId) -> Option> { + self.index_metadata.get(&IndexKey { file_id }) + } + + pub fn put_index_metadata(&self, file_id: FileId, metadata: Arc) { + let key = IndexKey { file_id }; + CACHE_BYTES + .with_label_values(&[INDEX_METADATA_TYPE]) + .add(index_metadata_weight(&key, &metadata).into()); + self.index_metadata.insert(key, metadata) + } + + // todo(hl): align index file content to pages with size like 4096 bytes. + pub fn get_index(&self, key: IndexKey) -> Option>> { + self.index.get(&key) + } + + pub fn put_index(&self, key: IndexKey, value: Arc>) { + CACHE_BYTES + .with_label_values(&[INDEX_CONTENT_TYPE]) + .add(index_content_weight(&key, &value).into()); + self.index.insert(key, value); + } +} + +/// Calculates weight for index metadata. +fn index_metadata_weight(k: &IndexKey, v: &Arc) -> u32 { + (k.file_id.as_bytes().len() + v.encoded_len()) as u32 +} + +/// Calculates weight for index content. +fn index_content_weight(k: &IndexKey, v: &Arc>) -> u32 { + (k.file_id.as_bytes().len() + v.len()) as u32 +} diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index ca44342a6d4c..7919aeb4ca5e 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -381,6 +381,11 @@ pub struct InvertedIndexConfig { #[deprecated = "use [IndexConfig::write_buffer_size] instead"] #[serde(skip_serializing)] pub write_buffer_size: ReadableSize, + + /// Cache size for metadata of inverted index. Setting it to 0 to disable the cache. + pub metadata_cache_size: ReadableSize, + /// Cache size for inverted index content. Setting it to 0 to disable the cache. + pub content_cache_size: ReadableSize, } impl Default for InvertedIndexConfig { @@ -392,9 +397,10 @@ impl Default for InvertedIndexConfig { apply_on_query: Mode::Auto, mem_threshold_on_create: MemoryThreshold::Auto, compress: true, - write_buffer_size: ReadableSize::mb(8), intermediate_path: String::new(), + metadata_cache_size: ReadableSize::mb(32), + content_cache_size: ReadableSize::mb(32), } } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index ce4f789b9cf4..d5c128c6bf5d 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -330,10 +330,17 @@ impl ScanRegion { Some(file_cache) }(); + let index_cache = self + .cache_manager + .as_ref() + .and_then(|c| c.index_cache()) + .cloned(); + SstIndexApplierBuilder::new( self.access_layer.region_dir().to_string(), self.access_layer.object_store().clone(), file_cache, + index_cache, self.version.metadata.as_ref(), self.version .options diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 61e6a5537691..4852a3f32049 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -63,6 +63,17 @@ impl FileId { pub fn as_puffin(&self) -> String { format!("{}{}", self, ".puffin") } + + /// Converts [FileId] as byte slice. + pub fn as_bytes(&self) -> &[u8] { + self.0.as_bytes() + } +} + +impl From for Uuid { + fn from(value: FileId) -> Self { + value.0 + } } impl fmt::Display for FileId { diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index 7463f6011fca..566af65d2213 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -27,6 +27,7 @@ use snafu::ResultExt; use store_api::storage::RegionId; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; +use crate::cache::index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef}; use crate::error::{ApplyIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result}; use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE}; use crate::sst::file::FileId; @@ -55,6 +56,9 @@ pub(crate) struct SstIndexApplier { /// The puffin manager factory. puffin_manager_factory: PuffinManagerFactory, + + /// In-memory cache for inverted index. + inverted_index_cache: Option, } pub(crate) type SstIndexApplierRef = Arc; @@ -66,6 +70,7 @@ impl SstIndexApplier { region_id: RegionId, store: ObjectStore, file_cache: Option, + index_cache: Option, index_applier: Box, puffin_manager_factory: PuffinManagerFactory, ) -> Self { @@ -78,6 +83,7 @@ impl SstIndexApplier { file_cache, index_applier, puffin_manager_factory, + inverted_index_cache: index_cache, } } @@ -99,13 +105,24 @@ impl SstIndexApplier { self.remote_blob_reader(file_id).await? } }; - let mut blob_reader = InvertedIndexBlobReader::new(blob); - let output = self - .index_applier - .apply(context, &mut blob_reader) - .await - .context(ApplyIndexSnafu)?; - Ok(output) + + if let Some(index_cache) = &self.inverted_index_cache { + let mut index_reader = CachedInvertedIndexBlobReader::new( + file_id, + InvertedIndexBlobReader::new(blob), + index_cache.clone(), + ); + self.index_applier + .apply(context, &mut index_reader) + .await + .context(ApplyIndexSnafu) + } else { + let mut index_reader = InvertedIndexBlobReader::new(blob); + self.index_applier + .apply(context, &mut index_reader) + .await + .context(ApplyIndexSnafu) + } } /// Creates a blob reader from the cached index file. @@ -200,6 +217,7 @@ mod tests { RegionId::new(0, 0), object_store, None, + None, Box::new(mock_index_applier), puffin_manager_factory, ); @@ -241,6 +259,7 @@ mod tests { RegionId::new(0, 0), object_store, None, + None, Box::new(mock_index_applier), puffin_manager_factory, ); diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder.rs b/src/mito2/src/sst/index/inverted_index/applier/builder.rs index 3dcb5c0ec8a3..0736b07fb6b9 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder.rs @@ -34,6 +34,7 @@ use store_api::metadata::RegionMetadata; use store_api::storage::ColumnId; use crate::cache::file_cache::FileCacheRef; +use crate::cache::index::InvertedIndexCacheRef; use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result}; use crate::row_converter::SortField; use crate::sst::index::inverted_index::applier::SstIndexApplier; @@ -62,6 +63,9 @@ pub(crate) struct SstIndexApplierBuilder<'a> { /// The puffin manager factory. puffin_manager_factory: PuffinManagerFactory, + + /// Cache for inverted index. + index_cache: Option, } impl<'a> SstIndexApplierBuilder<'a> { @@ -70,6 +74,7 @@ impl<'a> SstIndexApplierBuilder<'a> { region_dir: String, object_store: ObjectStore, file_cache: Option, + index_cache: Option, metadata: &'a RegionMetadata, ignore_column_ids: HashSet, puffin_manager_factory: PuffinManagerFactory, @@ -81,6 +86,7 @@ impl<'a> SstIndexApplierBuilder<'a> { metadata, ignore_column_ids, output: HashMap::default(), + index_cache, puffin_manager_factory, } } @@ -102,11 +108,13 @@ impl<'a> SstIndexApplierBuilder<'a> { .map(|(column_id, predicates)| (column_id.to_string(), predicates)) .collect(); let applier = PredicatesIndexApplier::try_from(predicates); + Ok(Some(SstIndexApplier::new( self.region_dir, self.metadata.region_id, self.object_store, self.file_cache, + self.index_cache, Box::new(applier.context(BuildIndexApplierSnafu)?), self.puffin_manager_factory, ))) @@ -320,6 +328,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs index c35736d42bad..c7f1f90cf0ce 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs @@ -76,6 +76,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -118,6 +119,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -143,6 +145,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -168,6 +171,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -194,6 +198,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs index 450e39ad7aee..b7870039115d 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs @@ -232,6 +232,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -260,6 +261,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -279,6 +281,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -299,6 +302,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs index 24f677db1d78..c776cdb74361 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs @@ -138,6 +138,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -175,6 +176,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -195,6 +197,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -214,6 +217,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -233,6 +237,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -291,6 +296,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -328,6 +334,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs index 146b58aeec04..1f23d3fa0cee 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs @@ -69,6 +69,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -101,6 +102,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -125,6 +127,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -149,6 +152,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -175,6 +179,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs index 3c2122f4c028..ae19b5ef7ce8 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs @@ -63,6 +63,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -91,6 +92,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -112,6 +114,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, @@ -133,6 +136,7 @@ mod tests { "test".to_string(), test_object_store(), None, + None, &metadata, HashSet::default(), facotry, diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 00aafad1595d..380661d60db9 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -304,6 +304,7 @@ mod tests { use store_api::storage::RegionId; use super::*; + use crate::cache::index::InvertedIndexCache; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -414,10 +415,12 @@ mod tests { move |expr| { let _d = &d; + let cache = Arc::new(InvertedIndexCache::new(10, 10)); let applier = SstIndexApplierBuilder::new( region_dir.clone(), object_store.clone(), None, + Some(cache), ®ion_metadata, Default::default(), factory.clone(), diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 2ffcc65fbe46..617950cd063b 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -158,6 +158,8 @@ impl WorkerGroup { .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes()) .vector_cache_size(config.vector_cache_size.as_bytes()) .page_cache_size(config.page_cache_size.as_bytes()) + .index_metadata_size(config.inverted_index.metadata_cache_size.as_bytes()) + .index_content_size(config.inverted_index.content_cache_size.as_bytes()) .write_cache(write_cache) .build(), ); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7cebae847e26..96d9316f5549 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -839,6 +839,8 @@ create_on_compaction = "auto" apply_on_query = "auto" mem_threshold_on_create = "auto" compress = true +metadata_cache_size = "32MiB" +content_cache_size = "32MiB" [region_engine.mito.fulltext_index] create_on_flush = "auto"