From 782724beefe8f739e7b54c334bc938e308a88506 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 11 Jan 2024 10:08:16 +0000 Subject: [PATCH] feat(mito): support write cache for index file Signed-off-by: Zhenchi --- src/mito2/src/access_layer.rs | 13 +- src/mito2/src/cache/file_cache.rs | 117 ++++++++++++++---- src/mito2/src/cache/write_cache.rs | 75 ++++++++--- src/mito2/src/error.rs | 15 ++- src/mito2/src/read/scan_region.rs | 9 ++ src/mito2/src/read/seq_scan.rs | 1 + src/mito2/src/sst/file.rs | 5 + src/mito2/src/sst/index/applier.rs | 69 ++++++++--- src/mito2/src/sst/index/applier/builder.rs | 12 +- .../src/sst/index/applier/builder/between.rs | 10 +- .../sst/index/applier/builder/comparison.rs | 8 +- .../src/sst/index/applier/builder/eq_list.rs | 14 +-- .../src/sst/index/applier/builder/in_list.rs | 10 +- .../sst/index/applier/builder/regex_match.rs | 8 +- src/mito2/src/sst/parquet.rs | 20 ++- src/mito2/src/sst/parquet/reader.rs | 2 +- src/mito2/src/sst/parquet/writer.rs | 6 + 17 files changed, 302 insertions(+), 92 deletions(-) diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 9160cc32dc96..d9a0d2a1b3f1 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -102,7 +102,8 @@ impl AccessLayer { request: SstWriteRequest, write_opts: &WriteOptions, ) -> Result> { - let path = location::sst_file_path(&self.region_dir, request.file_id); + let file_path = location::sst_file_path(&self.region_dir, request.file_id); + let index_file_path = location::index_file_path(&self.region_dir, request.file_id); let region_id = request.metadata.region_id; let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() { @@ -114,7 +115,8 @@ impl AccessLayer { metadata: request.metadata, source: request.source, storage: request.storage, - upload_path: path, + upload_path: file_path, + index_upload_path: index_file_path, remote_store: self.object_store.clone(), }, write_opts, @@ -122,7 +124,12 @@ impl AccessLayer { .await? } else { // Write cache is disabled. - let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone()); + let mut writer = ParquetWriter::new( + file_path, + index_file_path, + request.metadata, + self.object_store.clone(), + ); writer.write_all(request.source, write_opts).await? }; diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 94a8518dd1c0..d743d9ee194d 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -68,7 +68,7 @@ impl FileCache { // The cache is replaced by another file. This is unexpected, we don't remove the same // file but updates the metrics as the file is already replaced by users. CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into()); - warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.0); + warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id); return; } @@ -77,7 +77,7 @@ impl FileCache { CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into()); } Err(e) => { - warn!(e; "Failed to delete cached file {} for region {}", file_path, key.0); + warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id); } } } @@ -205,7 +205,51 @@ impl FileCache { } /// Key of file cache index. -pub(crate) type IndexKey = (RegionId, FileId); +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct IndexKey { + pub region_id: RegionId, + pub file_id: FileId, + pub file_type: FileType, +} + +impl IndexKey { + /// Creates a new index key. + pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey { + IndexKey { + region_id, + file_id, + file_type, + } + } +} + +/// Type of the file. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) enum FileType { + /// Parquet file. + Parquet, + /// Puffin file. + Puffin, +} + +impl FileType { + /// Parses the file type from string. + fn parse(s: &str) -> Option { + match s { + "parquet" => Some(FileType::Parquet), + "puffin" => Some(FileType::Puffin), + _ => None, + } + } + + /// Converts the file type to string. + fn as_str(&self) -> &'static str { + match self { + FileType::Parquet => "parquet", + FileType::Puffin => "puffin", + } + } +} /// An entity that describes the file in the file cache. /// @@ -220,19 +264,28 @@ pub(crate) struct IndexValue { /// /// The file name format is `{region_id}.{file_id}` fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String { - join_path(cache_file_dir, &format!("{}.{}", key.0.as_u64(), key.1)) + join_path( + cache_file_dir, + &format!( + "{}.{}.{}", + key.region_id.as_u64(), + key.file_id, + key.file_type.as_str() + ), + ) } /// Parse index key from the file name. fn parse_index_key(name: &str) -> Option { - let mut splited = name.splitn(2, '.'); - let region_id = splited.next().and_then(|s| { + let mut split = name.splitn(3, '.'); + let region_id = split.next().and_then(|s| { let id = s.parse::().ok()?; Some(RegionId::from_u64(id)) })?; - let file_id = splited.next().and_then(|s| FileId::parse_str(s).ok())?; + let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?; + let file_type = split.next().and_then(FileType::parse)?; - Some((region_id, file_id)) + Some(IndexKey::new(region_id, file_id, file_type)) } #[cfg(test)] @@ -257,7 +310,7 @@ mod tests { let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); let region_id = RegionId::new(2000, 0); let file_id = FileId::random(); - let key = (region_id, file_id); + let key = IndexKey::new(region_id, file_id, FileType::Parquet); let file_path = cache.cache_file_path(key); // Get an empty file. @@ -270,7 +323,10 @@ mod tests { .unwrap(); // Add to the cache. cache - .put((region_id, file_id), IndexValue { file_size: 5 }) + .put( + IndexKey::new(region_id, file_id, FileType::Parquet), + IndexValue { file_size: 5 }, + ) .await; // Read file content. @@ -303,7 +359,7 @@ mod tests { let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); let region_id = RegionId::new(2000, 0); let file_id = FileId::random(); - let key = (region_id, file_id); + let key = IndexKey::new(region_id, file_id, FileType::Parquet); let file_path = cache.cache_file_path(key); // Write a file. @@ -313,7 +369,10 @@ mod tests { .unwrap(); // Add to the cache. cache - .put((region_id, file_id), IndexValue { file_size: 5 }) + .put( + IndexKey::new(region_id, file_id, FileType::Parquet), + IndexValue { file_size: 5 }, + ) .await; // Remove the file but keep the index. @@ -332,11 +391,12 @@ mod tests { let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); let region_id = RegionId::new(2000, 0); + let file_type = FileType::Parquet; // Write N files. let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect(); let mut total_size = 0; for (i, file_id) in file_ids.iter().enumerate() { - let key = (region_id, *file_id); + let key = IndexKey::new(region_id, *file_id, file_type); let file_path = cache.cache_file_path(key); let bytes = i.to_string().into_bytes(); local_store.write(&file_path, bytes.clone()).await.unwrap(); @@ -344,7 +404,7 @@ mod tests { // Add to the cache. cache .put( - (region_id, *file_id), + IndexKey::new(region_id, *file_id, file_type), IndexValue { file_size: bytes.len() as u32, }, @@ -356,7 +416,10 @@ mod tests { // Recover the cache. let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); // No entry before recovery. - assert!(cache.reader((region_id, file_ids[0])).await.is_none()); + assert!(cache + .reader(IndexKey::new(region_id, file_ids[0], file_type)) + .await + .is_none()); cache.recover().await.unwrap(); // Check size. @@ -364,7 +427,7 @@ mod tests { assert_eq!(total_size, cache.memory_index.weighted_size() as usize); for (i, file_id) in file_ids.iter().enumerate() { - let key = (region_id, *file_id); + let key = IndexKey::new(region_id, *file_id, file_type); let mut reader = cache.reader(key).await.unwrap(); let mut buf = String::new(); reader.read_to_string(&mut buf).await.unwrap(); @@ -376,12 +439,18 @@ mod tests { fn test_cache_file_path() { let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap(); assert_eq!( - "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095", - cache_file_path("test_dir", (RegionId::new(1234, 5), file_id)) + "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet", + cache_file_path( + "test_dir", + IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet) + ) ); assert_eq!( - "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095", - cache_file_path("test_dir/", (RegionId::new(1234, 5), file_id)) + "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet", + cache_file_path( + "test_dir/", + IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet) + ) ); } @@ -390,8 +459,8 @@ mod tests { let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap(); let region_id = RegionId::new(1234, 5); assert_eq!( - (region_id, file_id), - parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").unwrap() + IndexKey::new(region_id, file_id, FileType::Parquet), + parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap() ); assert!(parse_index_key("").is_none()); assert!(parse_index_key(".").is_none()); @@ -400,8 +469,6 @@ mod tests { assert!(parse_index_key(".5299989643269").is_none()); assert!(parse_index_key("5299989643269.").is_none()); assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none()); - assert!( - parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").is_none() - ); + assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none()); } } diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index f0cf58de9140..1b578ec2b06b 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -23,9 +23,10 @@ use object_store::manager::ObjectStoreManagerRef; use object_store::ObjectStore; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; +use store_api::storage::RegionId; use crate::access_layer::new_fs_object_store; -use crate::cache::file_cache::{FileCache, FileCacheRef, IndexValue}; +use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; use crate::error::{self, Result}; use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL}; use crate::read::Source; @@ -75,6 +76,11 @@ impl WriteCache { Self::new(local_store, object_store_manager, cache_capacity).await } + /// Returns the file cache of the write cache. + pub(crate) fn file_cache(&self) -> FileCacheRef { + self.file_cache.clone() + } + /// Writes SST to the cache and then uploads it to the remote object store. pub async fn write_and_upload_sst( &self, @@ -87,11 +93,13 @@ impl WriteCache { let region_id = request.metadata.region_id; let file_id = request.file_id; + let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet); + let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); - let cache_path = self.file_cache.cache_file_path((region_id, file_id)); // Write to FileCache. let mut writer = ParquetWriter::new( - cache_path.clone(), + self.file_cache.cache_file_path(parquet_key), + self.file_cache.cache_file_path(puffin_key), request.metadata, self.file_cache.local_store(), ); @@ -106,8 +114,35 @@ impl WriteCache { return Ok(None); }; + let parquet_path = &request.upload_path; + let remote_store = &request.remote_store; + self.upload(parquet_key, parquet_path, remote_store).await?; + + if sst_info.inverted_index_available { + let puffin_path = &request.index_upload_path; + self.upload(puffin_key, puffin_path, remote_store).await?; + } + + Ok(Some(sst_info)) + } + + /// Uploads a Parquet file or a Puffin file to the remote object store. + async fn upload( + &self, + index_key: IndexKey, + upload_path: &str, + remote_store: &ObjectStore, + ) -> Result<()> { + let region_id = index_key.region_id; + let file_id = index_key.file_id; + let file_type = index_key.file_type; + let cache_path = self.file_cache.cache_file_path(index_key); + let timer = FLUSH_ELAPSED - .with_label_values(&["upload_sst"]) + .with_label_values(&[match file_type { + FileType::Parquet => "upload_parquet", + FileType::Puffin => "upload_puffin", + }]) .start_timer(); let reader = self @@ -117,17 +152,17 @@ impl WriteCache { .await .context(error::OpenDalSnafu)?; - let upload_path = request.upload_path; - let mut writer = request - .remote_store - .writer_with(&upload_path) + let mut writer = remote_store + .writer_with(upload_path) .buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) .await .context(error::OpenDalSnafu)?; - let bytes_written = futures::io::copy(reader, &mut writer) - .await - .context(error::UploadSstSnafu { region_id, file_id })?; + let copy_res = futures::io::copy(reader, &mut writer).await; + let bytes_written = match file_type { + FileType::Parquet => copy_res.context(error::UploadSstSnafu { region_id, file_id })?, + FileType::Puffin => copy_res.context(error::UploadIndexSnafu { region_id, file_id })?, + }; // Must close to upload all data. writer.close().await.context(error::OpenDalSnafu)?; @@ -142,13 +177,13 @@ impl WriteCache { timer.stop_and_record() ); + let index_value = IndexValue { + file_size: bytes_written as _, + }; // Register to file cache - let file_size = sst_info.file_size as u32; - self.file_cache - .put((region_id, file_id), IndexValue { file_size }) - .await; + self.file_cache.put(index_key, index_value).await; - Ok(Some(sst_info)) + Ok(()) } } @@ -160,6 +195,8 @@ pub struct SstUploadRequest { pub storage: Option, /// Path to upload the file. pub upload_path: String, + /// Path to upload the index file. + pub index_upload_path: String, /// Remote object store to upload. pub remote_store: ObjectStore, } @@ -178,7 +215,7 @@ mod tests { use crate::cache::file_cache::{self, FileCache}; use crate::cache::test_util::new_fs_store; use crate::sst::file::FileId; - use crate::sst::location::sst_file_path; + use crate::sst::location::{index_file_path, sst_file_path}; use crate::test_util::sst_util::{ new_batch_by_range, new_source, sst_file_handle, sst_region_metadata, }; @@ -192,6 +229,7 @@ mod tests { let mock_store = env.init_object_store_manager(); let file_id = FileId::random(); let upload_path = sst_file_path("test", file_id); + let index_upload_path = index_file_path("test", file_id); // Create WriteCache let local_dir = create_temp_dir(""); @@ -220,6 +258,7 @@ mod tests { source, storage: None, upload_path: upload_path.clone(), + index_upload_path, remote_store: mock_store.clone(), }; @@ -236,7 +275,7 @@ mod tests { .unwrap(); // Check write cache contains the key - let key = (region_id, file_id); + let key = IndexKey::new(region_id, file_id, FileType::Parquet); assert!(write_cache.file_cache.contains_key(&key)); // Check file data diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index d06e7b0f6aaa..4359e01f0e7a 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -533,6 +533,19 @@ pub enum Error { error: std::io::Error, location: Location, }, + + #[snafu(display( + "Failed to upload index file, region_id: {}, file_id: {}", + region_id, + file_id + ))] + UploadIndex { + region_id: RegionId, + file_id: FileId, + #[snafu(source)] + error: std::io::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -629,7 +642,7 @@ impl ErrorExt for Error { CleanDir { .. } => StatusCode::Unexpected, InvalidConfig { .. } => StatusCode::InvalidArguments, StaleLogEntry { .. } => StatusCode::Unexpected, - UploadSst { .. } => StatusCode::StorageUnavailable, + UploadSst { .. } | UploadIndex { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index a0bcec31b3b4..c2b0939d9b2a 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -23,6 +23,7 @@ use store_api::storage::ScanRequest; use table::predicate::{Predicate, TimeRangePredicateBuilder}; use crate::access_layer::AccessLayerRef; +use crate::cache::file_cache::FileCacheRef; use crate::cache::CacheManagerRef; use crate::error::Result; use crate::read::projection::ProjectionMapper; @@ -233,9 +234,17 @@ impl ScanRegion { /// Use the latest schema to build the index applier. fn build_index_applier(&self) -> Option { + let file_cache = || -> Option { + let cache_manager = self.cache_manager.as_ref()?; + let write_cache = cache_manager.write_cache()?; + let file_cache = write_cache.file_cache(); + Some(file_cache) + }(); + SstIndexApplierBuilder::new( self.access_layer.region_dir().to_string(), self.access_layer.object_store().clone(), + file_cache, self.version.metadata.as_ref(), ) .build(&self.request.filters) diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index a17a67fbd349..d9595673f483 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -224,6 +224,7 @@ impl SeqScan { .time_range(self.time_range) .projection(Some(self.mapper.column_ids().to_vec())) .cache(self.cache_manager.clone()) + .index_applier(self.index_applier.clone()) .build() .await; let reader = match maybe_reader { diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index f86faa81ef78..e1390ee7db9d 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -157,6 +157,11 @@ impl FileHandle { location::sst_file_path(file_dir, self.file_id()) } + /// Returns the complete file path of the index file. + pub fn index_file_path(&self, file_dir: &str) -> String { + location::index_file_path(file_dir, self.file_id()) + } + /// Returns the time range of the file. pub fn time_range(&self) -> FileTimeRange { self.inner.meta.time_range diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index 5ae287f383a4..3355f5c2d902 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -25,7 +25,9 @@ use index::inverted_index::search::index_apply::{ use object_store::ObjectStore; use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader}; use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; +use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::error::{ ApplyIndexSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu, PuffinReadMetadataSnafu, Result, @@ -41,34 +43,42 @@ use crate::sst::location; /// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files /// and returning the relevant row group ids for further scan. -pub struct SstIndexApplier { +pub(crate) struct SstIndexApplier { /// The root directory of the region. region_dir: String, - /// Store responsible for accessing SST files. + /// Region ID. + region_id: RegionId, + + /// Store responsible for accessing remote index files. store: InstrumentedStore, + /// The cache of index files. + file_cache: Option, + /// Predefined index applier used to apply predicates to index files /// and return the relevant row group ids for further scan. index_applier: Box, } -pub type SstIndexApplierRef = Arc; +pub(crate) type SstIndexApplierRef = Arc; impl SstIndexApplier { /// Creates a new [`SstIndexApplier`]. - /// - /// TODO(zhongzc): leverage `WriteCache` pub fn new( region_dir: String, + region_id: RegionId, object_store: ObjectStore, + file_cache: Option, index_applier: Box, ) -> Self { INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64); Self { region_dir, + region_id, store: InstrumentedStore::new(object_store), + file_cache, index_applier, } } @@ -77,22 +87,49 @@ impl SstIndexApplier { pub async fn apply(&self, file_id: FileId) -> Result> { let _timer = INDEX_APPLY_ELAPSED.start_timer(); - let mut puffin_reader = self.puffin_reader(file_id).await?; - let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?; - let mut index_reader = InvertedIndexBlobReader::new(blob_reader); - let context = SearchContext { // Encountering a non-existing column indicates that it doesn't match predicates. index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty, }; - self.index_applier - .apply(context, &mut index_reader) + + match self.cached_puffin_reader(file_id).await? { + Some(mut puffin_reader) => { + let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?; + let mut index_reader = InvertedIndexBlobReader::new(blob_reader); + self.index_applier + .apply(context, &mut index_reader) + .await + .context(ApplyIndexSnafu) + } + None => { + let mut puffin_reader = self.remote_puffin_reader(file_id).await?; + let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?; + let mut index_reader = InvertedIndexBlobReader::new(blob_reader); + self.index_applier + .apply(context, &mut index_reader) + .await + .context(ApplyIndexSnafu) + } + } + } + + /// Helper function to create a [`PuffinFileReader`] from the cached index file. + async fn cached_puffin_reader( + &self, + file_id: FileId, + ) -> Result>> { + let Some(file_cache) = &self.file_cache else { + return Ok(None); + }; + + Ok(file_cache + .reader(IndexKey::new(self.region_id, file_id, FileType::Puffin)) .await - .context(ApplyIndexSnafu) + .map(PuffinFileReader::new)) } - /// Helper function to create a [`PuffinFileReader`] for the provided SST file id. - async fn puffin_reader( + /// Helper function to create a [`PuffinFileReader`] from the remote index file. + async fn remote_puffin_reader( &self, file_id: FileId, ) -> Result> { @@ -172,7 +209,9 @@ mod tests { let sst_index_applier = SstIndexApplier::new( region_dir.clone(), + RegionId::new(0, 0), object_store, + None, Box::new(mock_index_applier), ); let ids = sst_index_applier.apply(file_id).await.unwrap(); @@ -203,7 +242,9 @@ mod tests { let sst_index_applier = SstIndexApplier::new( region_dir.clone(), + RegionId::new(0, 0), object_store, + None, Box::new(mock_index_applier), ); let res = sst_index_applier.apply(file_id).await; diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs index 6779e817fd61..eaced21997e1 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -34,19 +34,23 @@ use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; use store_api::storage::ColumnId; +use crate::cache::file_cache::FileCacheRef; use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result}; use crate::row_converter::SortField; use crate::sst::index::applier::SstIndexApplier; use crate::sst::index::codec::IndexValueCodec; /// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan. -pub struct SstIndexApplierBuilder<'a> { +pub(crate) struct SstIndexApplierBuilder<'a> { /// Directory of the region, required argument for constructing [`SstIndexApplier`]. region_dir: String, /// Object store, required argument for constructing [`SstIndexApplier`]. object_store: ObjectStore, + /// File cache, optional argument for constructing [`SstIndexApplier`]. + file_cache: Option, + /// Metadata of the region, used to get metadata like column type. metadata: &'a RegionMetadata, @@ -59,11 +63,13 @@ impl<'a> SstIndexApplierBuilder<'a> { pub fn new( region_dir: String, object_store: ObjectStore, + file_cache: Option, metadata: &'a RegionMetadata, ) -> Self { Self { region_dir, object_store, + file_cache, metadata, output: HashMap::default(), } @@ -88,7 +94,9 @@ impl<'a> SstIndexApplierBuilder<'a> { let applier = PredicatesIndexApplier::try_from(predicates); Ok(Some(SstIndexApplier::new( self.region_dir, + self.metadata.region_id, self.object_store, + self.file_cache, Box::new(applier.context(BuildIndexApplierSnafu)?), ))) } @@ -286,7 +294,7 @@ mod tests { fn test_collect_and_basic() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(DfExpr::BinaryExpr(BinaryExpr { diff --git a/src/mito2/src/sst/index/applier/builder/between.rs b/src/mito2/src/sst/index/applier/builder/between.rs index 5fed21dcc292..6edeaf689b9f 100644 --- a/src/mito2/src/sst/index/applier/builder/between.rs +++ b/src/mito2/src/sst/index/applier/builder/between.rs @@ -69,7 +69,7 @@ mod tests { fn test_collect_between_basic() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let between = Between { negated: false, @@ -103,7 +103,7 @@ mod tests { fn test_collect_between_negated() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let between = Between { negated: true, @@ -120,7 +120,7 @@ mod tests { fn test_collect_between_field_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let between = Between { negated: false, @@ -137,7 +137,7 @@ mod tests { fn test_collect_between_type_mismatch() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let between = Between { negated: false, @@ -155,7 +155,7 @@ mod tests { fn test_collect_between_nonexistent_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let between = Between { negated: false, diff --git a/src/mito2/src/sst/index/applier/builder/comparison.rs b/src/mito2/src/sst/index/applier/builder/comparison.rs index 31381c78715e..76973620aec6 100644 --- a/src/mito2/src/sst/index/applier/builder/comparison.rs +++ b/src/mito2/src/sst/index/applier/builder/comparison.rs @@ -224,7 +224,7 @@ mod tests { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); for ((left, op, right), _) in &cases { builder.collect_comparison_expr(left, op, right).unwrap(); @@ -244,7 +244,7 @@ mod tests { fn test_collect_comparison_type_mismatch() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10)); assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); @@ -255,7 +255,7 @@ mod tests { fn test_collect_comparison_field_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); builder .collect_comparison_expr(&field_column(), &Operator::Lt, &string_lit("abc")) @@ -267,7 +267,7 @@ mod tests { fn test_collect_comparison_nonexistent_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let res = builder.collect_comparison_expr( &nonexistent_column(), diff --git a/src/mito2/src/sst/index/applier/builder/eq_list.rs b/src/mito2/src/sst/index/applier/builder/eq_list.rs index 6e28920e38bc..d67c048ad105 100644 --- a/src/mito2/src/sst/index/applier/builder/eq_list.rs +++ b/src/mito2/src/sst/index/applier/builder/eq_list.rs @@ -133,7 +133,7 @@ mod tests { fn test_collect_eq_basic() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); builder .collect_eq(&tag_column(), &string_lit("foo")) @@ -162,7 +162,7 @@ mod tests { fn test_collect_eq_field_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); builder .collect_eq(&field_column(), &string_lit("abc")) @@ -174,7 +174,7 @@ mod tests { fn test_collect_eq_nonexistent_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc")); assert!(matches!(res, Err(Error::ColumnNotFound { .. }))); @@ -185,7 +185,7 @@ mod tests { fn test_collect_eq_type_mismatch() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let res = builder.collect_eq(&tag_column(), &int64_lit(1)); assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); @@ -196,7 +196,7 @@ mod tests { fn test_collect_or_eq_list_basic() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(tag_column()), @@ -246,7 +246,7 @@ mod tests { fn test_collect_or_eq_list_invalid_op() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(tag_column()), @@ -275,7 +275,7 @@ mod tests { fn test_collect_or_eq_list_multiple_columns() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(tag_column()), diff --git a/src/mito2/src/sst/index/applier/builder/in_list.rs b/src/mito2/src/sst/index/applier/builder/in_list.rs index 0d3081f1c0f0..294d3ab0b317 100644 --- a/src/mito2/src/sst/index/applier/builder/in_list.rs +++ b/src/mito2/src/sst/index/applier/builder/in_list.rs @@ -64,7 +64,7 @@ mod tests { fn test_collect_in_list_basic() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let in_list = InList { expr: Box::new(tag_column()), @@ -88,7 +88,7 @@ mod tests { fn test_collect_in_list_negated() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let in_list = InList { expr: Box::new(tag_column()), @@ -104,7 +104,7 @@ mod tests { fn test_collect_in_list_field_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let in_list = InList { expr: Box::new(field_column()), @@ -120,7 +120,7 @@ mod tests { fn test_collect_in_list_type_mismatch() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let in_list = InList { expr: Box::new(tag_column()), @@ -137,7 +137,7 @@ mod tests { fn test_collect_in_list_nonexistent_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let in_list = InList { expr: Box::new(nonexistent_column()), diff --git a/src/mito2/src/sst/index/applier/builder/regex_match.rs b/src/mito2/src/sst/index/applier/builder/regex_match.rs index 48646334e7bc..806da6d516f6 100644 --- a/src/mito2/src/sst/index/applier/builder/regex_match.rs +++ b/src/mito2/src/sst/index/applier/builder/regex_match.rs @@ -56,7 +56,7 @@ mod tests { fn test_regex_match_basic() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); builder .collect_regex_match(&tag_column(), &string_lit("abc")) @@ -76,7 +76,7 @@ mod tests { fn test_regex_match_field_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); builder .collect_regex_match(&field_column(), &string_lit("abc")) @@ -89,7 +89,7 @@ mod tests { fn test_regex_match_type_mismatch() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); builder .collect_regex_match(&tag_column(), &int64_lit(123)) @@ -102,7 +102,7 @@ mod tests { fn test_regex_match_type_nonexist_column() { let metadata = test_region_metadata(); let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); let res = builder.collect_regex_match(&nonexistent_column(), &string_lit("abc")); assert!(matches!(res, Err(Error::ColumnNotFound { .. }))); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index a6e78ca94835..d2f0ef43cc12 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -95,6 +95,7 @@ mod tests { let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let file_path = handle.file_path(FILE_DIR); + let index_file_path = handle.index_file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -107,7 +108,8 @@ mod tests { ..Default::default() }; - let mut writer = ParquetWriter::new(file_path, metadata, object_store.clone()); + let mut writer = + ParquetWriter::new(file_path, index_file_path, metadata, object_store.clone()); let info = writer .write_all(source, &write_opts) .await @@ -144,6 +146,7 @@ mod tests { let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let file_path = handle.file_path(FILE_DIR); + let index_file_path = handle.index_file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -156,7 +159,12 @@ mod tests { ..Default::default() }; // Prepare data. - let mut writer = ParquetWriter::new(file_path, metadata.clone(), object_store.clone()); + let mut writer = ParquetWriter::new( + file_path, + index_file_path, + metadata.clone(), + object_store.clone(), + ); writer .write_all(source, &write_opts) .await @@ -212,6 +220,7 @@ mod tests { let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let file_path = handle.file_path(FILE_DIR); + let index_file_path = handle.index_file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -225,7 +234,12 @@ mod tests { // write the sst file and get sst info // sst info contains the parquet metadata, which is converted from FileMetaData - let mut writer = ParquetWriter::new(file_path, metadata.clone(), object_store.clone()); + let mut writer = ParquetWriter::new( + file_path, + index_file_path, + metadata.clone(), + object_store.clone(), + ); let sst_info = writer .write_all(source, &write_opts) .await diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index ad3fb35ca665..e07773d5af41 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -49,7 +49,7 @@ use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; /// Parquet SST reader builder. -pub struct ParquetReaderBuilder { +pub(crate) struct ParquetReaderBuilder { /// SST directory. file_dir: String, file_handle: FileHandle, diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index e1d8765f5f45..1ba27c1bda70 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -38,6 +38,10 @@ use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; pub struct ParquetWriter { /// SST output file path. file_path: String, + /// Index file path. + /// + /// TODO(zhongzc): implement writing index file. + _index_file_path: String, /// Region metadata of the source and the target SST. metadata: RegionMetadataRef, object_store: ObjectStore, @@ -47,11 +51,13 @@ impl ParquetWriter { /// Creates a new parquet SST writer. pub fn new( file_path: String, + index_file_path: String, metadata: RegionMetadataRef, object_store: ObjectStore, ) -> ParquetWriter { ParquetWriter { file_path, + _index_file_path: index_file_path, metadata, object_store, }