diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 9160cc32dc96..ecbeea2f0b0b 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -102,7 +102,8 @@ impl AccessLayer { request: SstWriteRequest, write_opts: &WriteOptions, ) -> Result> { - let path = location::sst_file_path(&self.region_dir, request.file_id); + let file_path = location::sst_file_path(&self.region_dir, request.file_id); + let index_file_path = location::index_file_path(&self.region_dir, request.file_id); let region_id = request.metadata.region_id; let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() { @@ -114,7 +115,8 @@ impl AccessLayer { metadata: request.metadata, source: request.source, storage: request.storage, - upload_path: path, + upload_path: file_path, + index_upload_path: index_file_path, remote_store: self.object_store.clone(), }, write_opts, @@ -122,7 +124,8 @@ impl AccessLayer { .await? } else { // Write cache is disabled. - let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone()); + let mut writer = + ParquetWriter::new(file_path, request.metadata, self.object_store.clone()); writer.write_all(request.source, write_opts).await? }; diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 35dc5ef59525..5fa836e15c16 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -71,7 +71,7 @@ impl FileCache { // The cache is replaced by another file. This is unexpected, we don't remove the same // file but updates the metrics as the file is already replaced by users. CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into()); - warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.0); + warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id); return; } @@ -80,7 +80,7 @@ impl FileCache { CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into()); } Err(e) => { - warn!(e; "Failed to delete cached file {} for region {}", file_path, key.0); + warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id); } } } @@ -241,7 +241,51 @@ impl FileCache { } /// Key of file cache index. -pub(crate) type IndexKey = (RegionId, FileId); +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct IndexKey { + pub region_id: RegionId, + pub file_id: FileId, + pub file_type: FileType, +} + +impl IndexKey { + /// Creates a new index key. + pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey { + IndexKey { + region_id, + file_id, + file_type, + } + } +} + +/// Type of the file. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum FileType { + /// Parquet file. + Parquet, + /// Puffin file. + Puffin, +} + +impl FileType { + /// Parses the file type from string. + fn parse(s: &str) -> Option { + match s { + "parquet" => Some(FileType::Parquet), + "puffin" => Some(FileType::Puffin), + _ => None, + } + } + + /// Converts the file type to string. + fn as_str(&self) -> &'static str { + match self { + FileType::Parquet => "parquet", + FileType::Puffin => "puffin", + } + } +} /// An entity that describes the file in the file cache. /// @@ -254,21 +298,30 @@ pub(crate) struct IndexValue { /// Generates the path to the cached file. /// -/// The file name format is `{region_id}.{file_id}` +/// The file name format is `{region_id}.{file_id}.{file_type}` fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String { - join_path(cache_file_dir, &format!("{}.{}", key.0.as_u64(), key.1)) + join_path( + cache_file_dir, + &format!( + "{}.{}.{}", + key.region_id.as_u64(), + key.file_id, + key.file_type.as_str() + ), + ) } /// Parse index key from the file name. fn parse_index_key(name: &str) -> Option { - let mut splited = name.splitn(2, '.'); - let region_id = splited.next().and_then(|s| { + let mut split = name.splitn(3, '.'); + let region_id = split.next().and_then(|s| { let id = s.parse::().ok()?; Some(RegionId::from_u64(id)) })?; - let file_id = splited.next().and_then(|s| FileId::parse_str(s).ok())?; + let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?; + let file_type = split.next().and_then(FileType::parse)?; - Some((region_id, file_id)) + Some(IndexKey::new(region_id, file_id, file_type)) } #[cfg(test)] @@ -293,7 +346,7 @@ mod tests { let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); let region_id = RegionId::new(2000, 0); let file_id = FileId::random(); - let key = (region_id, file_id); + let key = IndexKey::new(region_id, file_id, FileType::Parquet); let file_path = cache.cache_file_path(key); // Get an empty file. @@ -306,7 +359,10 @@ mod tests { .unwrap(); // Add to the cache. cache - .put((region_id, file_id), IndexValue { file_size: 5 }) + .put( + IndexKey::new(region_id, file_id, FileType::Parquet), + IndexValue { file_size: 5 }, + ) .await; // Read file content. @@ -339,7 +395,7 @@ mod tests { let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); let region_id = RegionId::new(2000, 0); let file_id = FileId::random(); - let key = (region_id, file_id); + let key = IndexKey::new(region_id, file_id, FileType::Parquet); let file_path = cache.cache_file_path(key); // Write a file. @@ -349,7 +405,10 @@ mod tests { .unwrap(); // Add to the cache. cache - .put((region_id, file_id), IndexValue { file_size: 5 }) + .put( + IndexKey::new(region_id, file_id, FileType::Parquet), + IndexValue { file_size: 5 }, + ) .await; // Remove the file but keep the index. @@ -368,11 +427,12 @@ mod tests { let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); let region_id = RegionId::new(2000, 0); + let file_type = FileType::Parquet; // Write N files. let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect(); let mut total_size = 0; for (i, file_id) in file_ids.iter().enumerate() { - let key = (region_id, *file_id); + let key = IndexKey::new(region_id, *file_id, file_type); let file_path = cache.cache_file_path(key); let bytes = i.to_string().into_bytes(); local_store.write(&file_path, bytes.clone()).await.unwrap(); @@ -380,7 +440,7 @@ mod tests { // Add to the cache. cache .put( - (region_id, *file_id), + IndexKey::new(region_id, *file_id, file_type), IndexValue { file_size: bytes.len() as u32, }, @@ -392,7 +452,10 @@ mod tests { // Recover the cache. let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); // No entry before recovery. - assert!(cache.reader((region_id, file_ids[0])).await.is_none()); + assert!(cache + .reader(IndexKey::new(region_id, file_ids[0], file_type)) + .await + .is_none()); cache.recover().await.unwrap(); // Check size. @@ -400,7 +463,7 @@ mod tests { assert_eq!(total_size, cache.memory_index.weighted_size() as usize); for (i, file_id) in file_ids.iter().enumerate() { - let key = (region_id, *file_id); + let key = IndexKey::new(region_id, *file_id, file_type); let mut reader = cache.reader(key).await.unwrap(); let mut buf = String::new(); reader.read_to_string(&mut buf).await.unwrap(); @@ -415,7 +478,7 @@ mod tests { let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); let region_id = RegionId::new(2000, 0); let file_id = FileId::random(); - let key = (region_id, file_id); + let key = IndexKey::new(region_id, file_id, FileType::Parquet); let file_path = file_cache.cache_file_path(key); // Write a file. let data = b"hello greptime database"; @@ -424,9 +487,7 @@ mod tests { .await .unwrap(); // Add to the cache. - file_cache - .put((region_id, file_id), IndexValue { file_size: 5 }) - .await; + file_cache.put(key, IndexValue { file_size: 5 }).await; // Ranges let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64]; let bytes = file_cache.read_ranges(key, &ranges).await.unwrap(); @@ -442,12 +503,18 @@ mod tests { fn test_cache_file_path() { let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap(); assert_eq!( - "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095", - cache_file_path("test_dir", (RegionId::new(1234, 5), file_id)) + "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet", + cache_file_path( + "test_dir", + IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet) + ) ); assert_eq!( - "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095", - cache_file_path("test_dir/", (RegionId::new(1234, 5), file_id)) + "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet", + cache_file_path( + "test_dir/", + IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet) + ) ); } @@ -456,8 +523,8 @@ mod tests { let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap(); let region_id = RegionId::new(1234, 5); assert_eq!( - (region_id, file_id), - parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").unwrap() + IndexKey::new(region_id, file_id, FileType::Parquet), + parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap() ); assert!(parse_index_key("").is_none()); assert!(parse_index_key(".").is_none()); @@ -466,8 +533,13 @@ mod tests { assert!(parse_index_key(".5299989643269").is_none()); assert!(parse_index_key("5299989643269.").is_none()); assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none()); + assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none()); assert!( - parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").is_none() + parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none() ); + assert!(parse_index_key( + "5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin" + ) + .is_none()); } } diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index c450ef6b735c..5871853b99f6 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -25,10 +25,10 @@ use object_store::manager::ObjectStoreManagerRef; use object_store::ObjectStore; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; +use store_api::storage::RegionId; -use super::file_cache::IndexKey; use crate::access_layer::new_fs_object_store; -use crate::cache::file_cache::{FileCache, FileCacheRef, IndexValue}; +use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; use crate::error::{self, Result}; use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL}; use crate::read::Source; @@ -78,6 +78,11 @@ impl WriteCache { Self::new(local_store, object_store_manager, cache_capacity).await } + /// Returns the file cache of the write cache. + pub(crate) fn file_cache(&self) -> FileCacheRef { + self.file_cache.clone() + } + /// Writes SST to the cache and then uploads it to the remote object store. pub async fn write_and_upload_sst( &self, @@ -90,11 +95,11 @@ impl WriteCache { let region_id = request.metadata.region_id; let file_id = request.file_id; + let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet); - let cache_path = self.file_cache.cache_file_path((region_id, file_id)); // Write to FileCache. let mut writer = ParquetWriter::new( - cache_path.clone(), + self.file_cache.cache_file_path(parquet_key), request.metadata, self.file_cache.local_store(), ); @@ -109,8 +114,36 @@ impl WriteCache { return Ok(None); }; + let parquet_path = &request.upload_path; + let remote_store = &request.remote_store; + self.upload(parquet_key, parquet_path, remote_store).await?; + + if sst_info.inverted_index_available { + let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); + let puffin_path = &request.index_upload_path; + self.upload(puffin_key, puffin_path, remote_store).await?; + } + + Ok(Some(sst_info)) + } + + /// Uploads a Parquet file or a Puffin file to the remote object store. + async fn upload( + &self, + index_key: IndexKey, + upload_path: &str, + remote_store: &ObjectStore, + ) -> Result<()> { + let region_id = index_key.region_id; + let file_id = index_key.file_id; + let file_type = index_key.file_type; + let cache_path = self.file_cache.cache_file_path(index_key); + let timer = FLUSH_ELAPSED - .with_label_values(&["upload_sst"]) + .with_label_values(&[match file_type { + FileType::Parquet => "upload_parquet", + FileType::Puffin => "upload_puffin", + }]) .start_timer(); let reader = self @@ -120,17 +153,20 @@ impl WriteCache { .await .context(error::OpenDalSnafu)?; - let upload_path = request.upload_path; - let mut writer = request - .remote_store - .writer_with(&upload_path) + let mut writer = remote_store + .writer_with(upload_path) .buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) .await .context(error::OpenDalSnafu)?; - let bytes_written = futures::io::copy(reader, &mut writer) - .await - .context(error::UploadSstSnafu { region_id, file_id })?; + let bytes_written = + futures::io::copy(reader, &mut writer) + .await + .context(error::UploadSnafu { + region_id, + file_id, + file_type, + })?; // Must close to upload all data. writer.close().await.context(error::OpenDalSnafu)?; @@ -145,18 +181,13 @@ impl WriteCache { timer.stop_and_record() ); + let index_value = IndexValue { + file_size: bytes_written as _, + }; // Register to file cache - let file_size = sst_info.file_size as u32; - self.file_cache - .put((region_id, file_id), IndexValue { file_size }) - .await; - - Ok(Some(sst_info)) - } + self.file_cache.put(index_key, index_value).await; - /// Returns the file cache of the write cache. - pub(crate) fn file_cache(&self) -> FileCacheRef { - self.file_cache.clone() + Ok(()) } } @@ -168,6 +199,8 @@ pub struct SstUploadRequest { pub storage: Option, /// Path to upload the file. pub upload_path: String, + /// Path to upload the index file. + pub index_upload_path: String, /// Remote object store to upload. pub remote_store: ObjectStore, } @@ -186,7 +219,7 @@ mod tests { use crate::cache::file_cache::{self, FileCache}; use crate::cache::test_util::new_fs_store; use crate::sst::file::FileId; - use crate::sst::location::sst_file_path; + use crate::sst::location::{index_file_path, sst_file_path}; use crate::test_util::sst_util::{ new_batch_by_range, new_source, sst_file_handle, sst_region_metadata, }; @@ -200,6 +233,7 @@ mod tests { let mock_store = env.init_object_store_manager(); let file_id = FileId::random(); let upload_path = sst_file_path("test", file_id); + let index_upload_path = index_file_path("test", file_id); // Create WriteCache let local_dir = create_temp_dir(""); @@ -228,6 +262,7 @@ mod tests { source, storage: None, upload_path: upload_path.clone(), + index_upload_path, remote_store: mock_store.clone(), }; @@ -244,7 +279,7 @@ mod tests { .unwrap(); // Check write cache contains the key - let key = (region_id, file_id); + let key = IndexKey::new(region_id, file_id, FileType::Parquet); assert!(write_cache.file_cache.contains_key(&key)); // Check file data diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index d06e7b0f6aaa..5e1a983b9a2f 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -28,6 +28,7 @@ use snafu::{Location, Snafu}; use store_api::manifest::ManifestVersion; use store_api::storage::RegionId; +use crate::cache::file_cache::FileType; use crate::sst::file::FileId; use crate::worker::WorkerId; @@ -522,13 +523,15 @@ pub enum Error { }, #[snafu(display( - "Failed to upload sst file, region_id: {}, file_id: {}", + "Failed to upload file, region_id: {}, file_id: {}, file_type: {:?}", region_id, - file_id + file_id, + file_type, ))] - UploadSst { + Upload { region_id: RegionId, file_id: FileId, + file_type: FileType, #[snafu(source)] error: std::io::Error, location: Location, @@ -629,7 +632,7 @@ impl ErrorExt for Error { CleanDir { .. } => StatusCode::Unexpected, InvalidConfig { .. } => StatusCode::InvalidArguments, StaleLogEntry { .. } => StatusCode::Unexpected, - UploadSst { .. } => StatusCode::StorageUnavailable, + Upload { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index a0bcec31b3b4..c2b0939d9b2a 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -23,6 +23,7 @@ use store_api::storage::ScanRequest; use table::predicate::{Predicate, TimeRangePredicateBuilder}; use crate::access_layer::AccessLayerRef; +use crate::cache::file_cache::FileCacheRef; use crate::cache::CacheManagerRef; use crate::error::Result; use crate::read::projection::ProjectionMapper; @@ -233,9 +234,17 @@ impl ScanRegion { /// Use the latest schema to build the index applier. fn build_index_applier(&self) -> Option { + let file_cache = || -> Option { + let cache_manager = self.cache_manager.as_ref()?; + let write_cache = cache_manager.write_cache()?; + let file_cache = write_cache.file_cache(); + Some(file_cache) + }(); + SstIndexApplierBuilder::new( self.access_layer.region_dir().to_string(), self.access_layer.object_store().clone(), + file_cache, self.version.metadata.as_ref(), ) .build(&self.request.filters) diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index a17a67fbd349..d9595673f483 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -224,6 +224,7 @@ impl SeqScan { .time_range(self.time_range) .projection(Some(self.mapper.column_ids().to_vec())) .cache(self.cache_manager.clone()) + .index_applier(self.index_applier.clone()) .build() .await; let reader = match maybe_reader { diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index 5ae287f383a4..3355f5c2d902 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -25,7 +25,9 @@ use index::inverted_index::search::index_apply::{ use object_store::ObjectStore; use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader}; use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; +use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::error::{ ApplyIndexSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu, PuffinReadMetadataSnafu, Result, @@ -41,34 +43,42 @@ use crate::sst::location; /// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files /// and returning the relevant row group ids for further scan. -pub struct SstIndexApplier { +pub(crate) struct SstIndexApplier { /// The root directory of the region. region_dir: String, - /// Store responsible for accessing SST files. + /// Region ID. + region_id: RegionId, + + /// Store responsible for accessing remote index files. store: InstrumentedStore, + /// The cache of index files. + file_cache: Option, + /// Predefined index applier used to apply predicates to index files /// and return the relevant row group ids for further scan. index_applier: Box, } -pub type SstIndexApplierRef = Arc; +pub(crate) type SstIndexApplierRef = Arc; impl SstIndexApplier { /// Creates a new [`SstIndexApplier`]. - /// - /// TODO(zhongzc): leverage `WriteCache` pub fn new( region_dir: String, + region_id: RegionId, object_store: ObjectStore, + file_cache: Option, index_applier: Box, ) -> Self { INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64); Self { region_dir, + region_id, store: InstrumentedStore::new(object_store), + file_cache, index_applier, } } @@ -77,22 +87,49 @@ impl SstIndexApplier { pub async fn apply(&self, file_id: FileId) -> Result> { let _timer = INDEX_APPLY_ELAPSED.start_timer(); - let mut puffin_reader = self.puffin_reader(file_id).await?; - let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?; - let mut index_reader = InvertedIndexBlobReader::new(blob_reader); - let context = SearchContext { // Encountering a non-existing column indicates that it doesn't match predicates. index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty, }; - self.index_applier - .apply(context, &mut index_reader) + + match self.cached_puffin_reader(file_id).await? { + Some(mut puffin_reader) => { + let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?; + let mut index_reader = InvertedIndexBlobReader::new(blob_reader); + self.index_applier + .apply(context, &mut index_reader) + .await + .context(ApplyIndexSnafu) + } + None => { + let mut puffin_reader = self.remote_puffin_reader(file_id).await?; + let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?; + let mut index_reader = InvertedIndexBlobReader::new(blob_reader); + self.index_applier + .apply(context, &mut index_reader) + .await + .context(ApplyIndexSnafu) + } + } + } + + /// Helper function to create a [`PuffinFileReader`] from the cached index file. + async fn cached_puffin_reader( + &self, + file_id: FileId, + ) -> Result>> { + let Some(file_cache) = &self.file_cache else { + return Ok(None); + }; + + Ok(file_cache + .reader(IndexKey::new(self.region_id, file_id, FileType::Puffin)) .await - .context(ApplyIndexSnafu) + .map(PuffinFileReader::new)) } - /// Helper function to create a [`PuffinFileReader`] for the provided SST file id. - async fn puffin_reader( + /// Helper function to create a [`PuffinFileReader`] from the remote index file. + async fn remote_puffin_reader( &self, file_id: FileId, ) -> Result> { @@ -172,7 +209,9 @@ mod tests { let sst_index_applier = SstIndexApplier::new( region_dir.clone(), + RegionId::new(0, 0), object_store, + None, Box::new(mock_index_applier), ); let ids = sst_index_applier.apply(file_id).await.unwrap(); @@ -203,7 +242,9 @@ mod tests { let sst_index_applier = SstIndexApplier::new( region_dir.clone(), + RegionId::new(0, 0), object_store, + None, Box::new(mock_index_applier), ); let res = sst_index_applier.apply(file_id).await; diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs index 6779e817fd61..166c490eb36c 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -34,19 +34,23 @@ use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; use store_api::storage::ColumnId; +use crate::cache::file_cache::FileCacheRef; use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result}; use crate::row_converter::SortField; use crate::sst::index::applier::SstIndexApplier; use crate::sst::index::codec::IndexValueCodec; /// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan. -pub struct SstIndexApplierBuilder<'a> { +pub(crate) struct SstIndexApplierBuilder<'a> { /// Directory of the region, required argument for constructing [`SstIndexApplier`]. region_dir: String, /// Object store, required argument for constructing [`SstIndexApplier`]. object_store: ObjectStore, + /// File cache, required argument for constructing [`SstIndexApplier`]. + file_cache: Option, + /// Metadata of the region, used to get metadata like column type. metadata: &'a RegionMetadata, @@ -59,11 +63,13 @@ impl<'a> SstIndexApplierBuilder<'a> { pub fn new( region_dir: String, object_store: ObjectStore, + file_cache: Option, metadata: &'a RegionMetadata, ) -> Self { Self { region_dir, object_store, + file_cache, metadata, output: HashMap::default(), } @@ -88,7 +94,9 @@ impl<'a> SstIndexApplierBuilder<'a> { let applier = PredicatesIndexApplier::try_from(predicates); Ok(Some(SstIndexApplier::new( self.region_dir, + self.metadata.region_id, self.object_store, + self.file_cache, Box::new(applier.context(BuildIndexApplierSnafu)?), ))) } @@ -286,7 +294,7 @@ mod tests { fn test_collect_and_basic() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(DfExpr::BinaryExpr(BinaryExpr { diff --git a/src/mito2/src/sst/index/applier/builder/between.rs b/src/mito2/src/sst/index/applier/builder/between.rs index 5fed21dcc292..6edeaf689b9f 100644 --- a/src/mito2/src/sst/index/applier/builder/between.rs +++ b/src/mito2/src/sst/index/applier/builder/between.rs @@ -69,7 +69,7 @@ mod tests { fn test_collect_between_basic() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let between = Between { negated: false, @@ -103,7 +103,7 @@ mod tests { fn test_collect_between_negated() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let between = Between { negated: true, @@ -120,7 +120,7 @@ mod tests { fn test_collect_between_field_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let between = Between { negated: false, @@ -137,7 +137,7 @@ mod tests { fn test_collect_between_type_mismatch() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let between = Between { negated: false, @@ -155,7 +155,7 @@ mod tests { fn test_collect_between_nonexistent_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let between = Between { negated: false, diff --git a/src/mito2/src/sst/index/applier/builder/comparison.rs b/src/mito2/src/sst/index/applier/builder/comparison.rs index 31381c78715e..76973620aec6 100644 --- a/src/mito2/src/sst/index/applier/builder/comparison.rs +++ b/src/mito2/src/sst/index/applier/builder/comparison.rs @@ -224,7 +224,7 @@ mod tests { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); for ((left, op, right), _) in &cases { builder.collect_comparison_expr(left, op, right).unwrap(); @@ -244,7 +244,7 @@ mod tests { fn test_collect_comparison_type_mismatch() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10)); assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); @@ -255,7 +255,7 @@ mod tests { fn test_collect_comparison_field_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); builder .collect_comparison_expr(&field_column(), &Operator::Lt, &string_lit("abc")) @@ -267,7 +267,7 @@ mod tests { fn test_collect_comparison_nonexistent_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let res = builder.collect_comparison_expr( &nonexistent_column(), diff --git a/src/mito2/src/sst/index/applier/builder/eq_list.rs b/src/mito2/src/sst/index/applier/builder/eq_list.rs index 6e28920e38bc..d67c048ad105 100644 --- a/src/mito2/src/sst/index/applier/builder/eq_list.rs +++ b/src/mito2/src/sst/index/applier/builder/eq_list.rs @@ -133,7 +133,7 @@ mod tests { fn test_collect_eq_basic() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); builder .collect_eq(&tag_column(), &string_lit("foo")) @@ -162,7 +162,7 @@ mod tests { fn test_collect_eq_field_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); builder .collect_eq(&field_column(), &string_lit("abc")) @@ -174,7 +174,7 @@ mod tests { fn test_collect_eq_nonexistent_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc")); assert!(matches!(res, Err(Error::ColumnNotFound { .. }))); @@ -185,7 +185,7 @@ mod tests { fn test_collect_eq_type_mismatch() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let res = builder.collect_eq(&tag_column(), &int64_lit(1)); assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); @@ -196,7 +196,7 @@ mod tests { fn test_collect_or_eq_list_basic() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(tag_column()), @@ -246,7 +246,7 @@ mod tests { fn test_collect_or_eq_list_invalid_op() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(tag_column()), @@ -275,7 +275,7 @@ mod tests { fn test_collect_or_eq_list_multiple_columns() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(tag_column()), diff --git a/src/mito2/src/sst/index/applier/builder/in_list.rs b/src/mito2/src/sst/index/applier/builder/in_list.rs index 0d3081f1c0f0..294d3ab0b317 100644 --- a/src/mito2/src/sst/index/applier/builder/in_list.rs +++ b/src/mito2/src/sst/index/applier/builder/in_list.rs @@ -64,7 +64,7 @@ mod tests { fn test_collect_in_list_basic() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let in_list = InList { expr: Box::new(tag_column()), @@ -88,7 +88,7 @@ mod tests { fn test_collect_in_list_negated() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let in_list = InList { expr: Box::new(tag_column()), @@ -104,7 +104,7 @@ mod tests { fn test_collect_in_list_field_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let in_list = InList { expr: Box::new(field_column()), @@ -120,7 +120,7 @@ mod tests { fn test_collect_in_list_type_mismatch() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let in_list = InList { expr: Box::new(tag_column()), @@ -137,7 +137,7 @@ mod tests { fn test_collect_in_list_nonexistent_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let in_list = InList { expr: Box::new(nonexistent_column()), diff --git a/src/mito2/src/sst/index/applier/builder/regex_match.rs b/src/mito2/src/sst/index/applier/builder/regex_match.rs index 48646334e7bc..806da6d516f6 100644 --- a/src/mito2/src/sst/index/applier/builder/regex_match.rs +++ b/src/mito2/src/sst/index/applier/builder/regex_match.rs @@ -56,7 +56,7 @@ mod tests { fn test_regex_match_basic() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); builder .collect_regex_match(&tag_column(), &string_lit("abc")) @@ -76,7 +76,7 @@ mod tests { fn test_regex_match_field_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); builder .collect_regex_match(&field_column(), &string_lit("abc")) @@ -89,7 +89,7 @@ mod tests { fn test_regex_match_type_mismatch() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); builder .collect_regex_match(&tag_column(), &int64_lit(123)) @@ -102,7 +102,7 @@ mod tests { fn test_regex_match_type_nonexist_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let res = builder.collect_regex_match(&nonexistent_column(), &string_lit("abc")); assert!(matches!(res, Err(Error::ColumnNotFound { .. }))); diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index ad3fb35ca665..e07773d5af41 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -49,7 +49,7 @@ use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; /// Parquet SST reader builder. -pub struct ParquetReaderBuilder { +pub(crate) struct ParquetReaderBuilder { /// SST directory. file_dir: String, file_handle: FileHandle, diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 64bd4d4cd590..68a91e55fef4 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -29,11 +29,11 @@ use parquet::file::serialized_reader::SerializedPageReader; use parquet::format::PageLocation; use store_api::storage::RegionId; -use super::helper::fetch_byte_ranges; -use crate::cache::file_cache::IndexKey; +use crate::cache::file_cache::{FileType, IndexKey}; use crate::cache::{CacheManagerRef, PageKey, PageValue}; use crate::metrics::READ_STAGE_ELAPSED; use crate::sst::file::FileId; +use crate::sst::parquet::helper::fetch_byte_ranges; use crate::sst::parquet::page_reader::CachedPageReader; /// An in-memory collection of column chunks @@ -228,7 +228,7 @@ impl<'a> InMemoryRowGroup<'a> { /// Try to fetch data from WriteCache, /// if not in WriteCache, fetch data from object store directly. async fn fetch_bytes(&self, ranges: &[Range]) -> Result> { - let key = (self.region_id, self.file_id); + let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet); match self.fetch_ranges_from_write_cache(key, ranges).await { Some(data) => Ok(data), None => {