diff --git a/Cargo.lock b/Cargo.lock index 29a60c4f891b..986adf93749a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8716,6 +8716,9 @@ name = "smallvec" version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +dependencies = [ + "serde", +] [[package]] name = "smartstring" diff --git a/Cargo.toml b/Cargo.toml index 2ce7a1fcddb7..3d491f13c315 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,7 +121,7 @@ rskafka = "0.5" rust_decimal = "1.33" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -smallvec = "1" +smallvec = { version = "1", features = ["serde"] } snafu = "0.7" # on branch v0.38.x sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "6a93567ae38d42be5c8d08b13c8ff4dde26502ef", features = [ diff --git a/src/index/src/inverted_index/search/index_apply.rs b/src/index/src/inverted_index/search/index_apply.rs index 0dd2f7a5473b..24478d5e22d2 100644 --- a/src/index/src/inverted_index/search/index_apply.rs +++ b/src/index/src/inverted_index/search/index_apply.rs @@ -28,7 +28,7 @@ use crate::inverted_index::format::reader::InvertedIndexReader; /// avoiding repeated compilation of fixed predicates such as regex patterns. #[mockall::automock] #[async_trait] -pub trait IndexApplier { +pub trait IndexApplier: Send + Sync { /// Applies the predefined predicates to the data read by the given index reader, returning /// a list of relevant indices (e.g., post IDs, group IDs, row IDs). async fn apply<'a>( diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index d057323083ae..9160cc32dc96 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -22,9 +22,9 @@ use store_api::metadata::RegionMetadataRef; use crate::cache::write_cache::SstUploadRequest; use crate::cache::CacheManagerRef; -use crate::error::{CleanDirSnafu, DeleteSstSnafu, OpenDalSnafu, Result}; +use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result}; use crate::read::Source; -use crate::sst::file::{FileHandle, FileId}; +use crate::sst::file::{FileHandle, FileId, FileMeta}; use crate::sst::location; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; @@ -66,13 +66,27 @@ impl AccessLayer { &self.object_store } - /// Deletes a SST file with given file id. - pub(crate) async fn delete_sst(&self, file_id: FileId) -> Result<()> { - let path = location::sst_file_path(&self.region_dir, file_id); + /// Deletes a SST file (and its index file if it has one) with given file id. + pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> { + let path = location::sst_file_path(&self.region_dir, file_meta.file_id); self.object_store .delete(&path) .await - .context(DeleteSstSnafu { file_id }) + .context(DeleteSstSnafu { + file_id: file_meta.file_id, + })?; + + if file_meta.inverted_index_available() { + let path = location::index_file_path(&self.region_dir, file_meta.file_id); + self.object_store + .delete(&path) + .await + .context(DeleteIndexSnafu { + file_id: file_meta.file_id, + })?; + } + + Ok(()) } /// Returns a reader builder for specific `file`. diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index fefae906ba69..9a85cca3fef2 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -35,6 +35,8 @@ pub fn new_file_handle( ), level, file_size: 0, + available_indexes: Default::default(), + index_file_size: 0, }, file_purger, ) diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index e97030ac383c..3bc96dc2dda8 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -22,6 +22,7 @@ use common_telemetry::{debug, error, info}; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; +use smallvec::SmallVec; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; @@ -39,7 +40,7 @@ use crate::read::{BoxedBatchReader, Source}; use crate::request::{ BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, }; -use crate::sst::file::{FileHandle, FileId, FileMeta, Level}; +use crate::sst::file::{FileHandle, FileId, FileMeta, IndexType, Level}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::parquet::WriteOptions; use crate::sst::version::LevelMeta; @@ -330,6 +331,11 @@ impl TwcsCompactionTask { time_range: sst_info.time_range, level: output.output_level, file_size: sst_info.file_size, + available_indexes: sst_info + .inverted_index_available + .then(|| SmallVec::from_iter([IndexType::InvertedIndex])) + .unwrap_or_default(), + index_file_size: sst_info.index_file_size, }); Ok(file_meta_opt) }); diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 2e3dc4de0d53..2dff27290ca8 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -553,5 +553,5 @@ async fn test_region_usage() { assert_eq!(region_stat.sst_usage, 2742); // region total usage - assert_eq!(region_stat.disk_usage(), 3748); + assert_eq!(region_stat.disk_usage(), 3791); } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 2fede9ee7aff..d06e7b0f6aaa 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -321,6 +321,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to delete index file, file id: {}", file_id))] + DeleteIndex { + file_id: FileId, + #[snafu(source)] + error: object_store::Error, + location: Location, + }, + #[snafu(display("Failed to flush region {}", region_id))] FlushRegion { region_id: RegionId, @@ -596,7 +604,7 @@ impl ErrorExt for Error { InvalidSender { .. } => StatusCode::InvalidArguments, InvalidSchedulerState { .. } => StatusCode::InvalidArguments, StopScheduler { .. } => StatusCode::Internal, - DeleteSst { .. } => StatusCode::StorageUnavailable, + DeleteSst { .. } | DeleteIndex { .. } => StatusCode::StorageUnavailable, FlushRegion { source, .. } => source.status_code(), RegionDropped { .. } => StatusCode::Cancelled, RegionClosed { .. } => StatusCode::Cancelled, diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 10a1de0e5de2..f79d811ba473 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use common_telemetry::{error, info}; +use smallvec::SmallVec; use snafu::ResultExt; use store_api::storage::RegionId; use strum::IntoStaticStr; @@ -39,7 +40,7 @@ use crate::request::{ SenderWriteRequest, WorkerRequest, }; use crate::schedule::scheduler::{Job, SchedulerRef}; -use crate::sst::file::{FileId, FileMeta}; +use crate::sst::file::{FileId, FileMeta, IndexType}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::parquet::WriteOptions; use crate::worker::WorkerListener; @@ -339,6 +340,11 @@ impl RegionFlushTask { time_range: sst_info.time_range, level: 0, file_size: sst_info.file_size, + available_indexes: sst_info + .inverted_index_available + .then(|| SmallVec::from_iter([IndexType::InvertedIndex])) + .unwrap_or_default(), + index_file_size: sst_info.index_file_size, }; file_metas.push(file_meta); } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index c28f6cd6d598..c80becd5ec45 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -171,6 +171,8 @@ async fn checkpoint_with_different_compression_types() { time_range: (0.into(), 10000000.into()), level: 0, file_size: 1024000, + available_indexes: Default::default(), + index_file_size: 0, }; let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { files_to_add: vec![file_meta], diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 47cf99910550..87244d4c3165 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -121,6 +121,9 @@ lazy_static! { /// Counter of filtered rows during merge. pub static ref MERGE_FILTER_ROWS_TOTAL: IntCounterVec = register_int_counter_vec!("greptime_mito_merge_filter_rows_total", "mito merge filter rows total", &[TYPE_LABEL]).unwrap(); + /// Counter of row groups read. + pub static ref READ_ROW_GROUPS_TOTAL: IntCounterVec = + register_int_counter_vec!("greptime_mito_read_row_groups_total", "mito read row groups total", &[TYPE_LABEL]).unwrap(); // ------- End of query metrics. // Cache related metrics. diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 4a8c7028357b..a0bcec31b3b4 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,8 +14,10 @@ //! Scans a region according to the scan request. +use std::sync::Arc; + use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::debug; +use common_telemetry::{debug, warn}; use common_time::range::TimestampRange; use store_api::storage::ScanRequest; use table::predicate::{Predicate, TimeRangePredicateBuilder}; @@ -27,6 +29,8 @@ use crate::read::projection::ProjectionMapper; use crate::read::seq_scan::SeqScan; use crate::region::version::VersionRef; use crate::sst::file::FileHandle; +use crate::sst::index::applier::builder::SstIndexApplierBuilder; +use crate::sst::index::applier::SstIndexApplierRef; /// A scanner scans a region and returns a [SendableRecordBatchStream]. pub(crate) enum Scanner { @@ -194,6 +198,7 @@ impl ScanRegion { total_ssts ); + let index_applier = self.build_index_applier(); let predicate = Predicate::new(self.request.filters.clone()); // The mapper always computes projected column ids as the schema of SSTs may change. let mapper = match &self.request.projection { @@ -207,6 +212,7 @@ impl ScanRegion { .with_memtables(memtables) .with_files(files) .with_cache(self.cache_manager) + .with_index_applier(index_applier) .with_parallelism(self.parallelism); Ok(seq_scan) @@ -224,6 +230,20 @@ impl ScanRegion { TimeRangePredicateBuilder::new(&time_index.column_schema.name, unit, &self.request.filters) .build() } + + /// Use the latest schema to build the index applier. + fn build_index_applier(&self) -> Option { + SstIndexApplierBuilder::new( + self.access_layer.region_dir().to_string(), + self.access_layer.object_store().clone(), + self.version.metadata.as_ref(), + ) + .build(&self.request.filters) + .inspect_err(|err| warn!(err; "Failed to build index applier")) + .ok() + .flatten() + .map(Arc::new) + } } /// Config for parallel scan. diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index f963568cad59..a17a67fbd349 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -39,6 +39,7 @@ use crate::read::projection::ProjectionMapper; use crate::read::scan_region::ScanParallism; use crate::read::{BatchReader, BoxedBatchReader, BoxedBatchStream, Source}; use crate::sst::file::FileHandle; +use crate::sst::index::applier::SstIndexApplierRef; /// Scans a region and returns rows in a sorted sequence. /// @@ -62,6 +63,8 @@ pub struct SeqScan { ignore_file_not_found: bool, /// Parallelism to scan data. parallelism: ScanParallism, + /// Index applier. + index_applier: Option, } impl SeqScan { @@ -78,6 +81,7 @@ impl SeqScan { cache_manager: None, ignore_file_not_found: false, parallelism: ScanParallism::default(), + index_applier: None, } } @@ -130,6 +134,13 @@ impl SeqScan { self } + /// Sets index applier. + #[must_use] + pub(crate) fn with_index_applier(mut self, index_applier: Option) -> Self { + self.index_applier = index_applier; + self + } + /// Builds a stream for the query. pub async fn build_stream(&self) -> Result { let start = Instant::now(); diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 8f04492ba234..8b2e090cc977 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -651,8 +651,7 @@ impl OnFailure for FlushFinished { // Clean flushed files. for file in &self.file_metas { self.file_purger.send_request(PurgeRequest { - region_id: file.region_id, - file_id: file.file_id, + file_meta: file.clone(), }); } } @@ -707,14 +706,12 @@ impl OnFailure for CompactionFinished { })); } for file in &self.compacted_files { - let file_id = file.file_id; warn!( "Cleaning region {} compaction output file: {}", - self.region_id, file_id + self.region_id, file.file_id ); self.file_purger.send_request(PurgeRequest { - region_id: self.region_id, - file_id, + file_meta: file.clone(), }); } } diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index d32133d1bcc6..f86faa81ef78 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use common_time::Timestamp; use serde::{Deserialize, Serialize}; +use smallvec::SmallVec; use snafu::{ResultExt, Snafu}; use store_api::storage::RegionId; use uuid::Uuid; @@ -95,6 +96,23 @@ pub struct FileMeta { pub level: Level, /// Size of the file. pub file_size: u64, + /// Available indexes of the file. + pub available_indexes: SmallVec<[IndexType; 4]>, + /// Size of the index file. + pub index_file_size: u64, +} + +/// Type of index. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum IndexType { + /// Inverted index. + InvertedIndex, +} + +impl FileMeta { + pub fn inverted_index_available(&self) -> bool { + self.available_indexes.contains(&IndexType::InvertedIndex) + } } /// Handle to a SST file. @@ -176,8 +194,7 @@ impl Drop for FileHandleInner { fn drop(&mut self) { if self.deleted.load(Ordering::Relaxed) { self.file_purger.send_request(PurgeRequest { - region_id: self.meta.region_id, - file_id: self.meta.file_id, + file_meta: self.meta.clone(), }); } } @@ -236,6 +253,8 @@ mod tests { time_range: FileTimeRange::default(), level, file_size: 0, + available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + index_file_size: 0, } } @@ -250,7 +269,8 @@ mod tests { #[test] fn test_deserialize_from_string() { let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\ - \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\"level\":0}"; + \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\ + \"available_indexes\":[\"InvertedIndex\"],\"level\":0}"; let file_meta = create_file_meta( FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(), 0, diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 059b1956d7e2..cc913c1a7e22 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -16,20 +16,17 @@ use std::fmt; use std::sync::Arc; use common_telemetry::{error, info}; -use store_api::storage::RegionId; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; use crate::schedule::scheduler::SchedulerRef; -use crate::sst::file::FileId; +use crate::sst::file::FileMeta; /// Request to remove a file. #[derive(Debug)] pub struct PurgeRequest { - /// Region id of the file. - pub region_id: RegionId, - /// Id of the file. - pub file_id: FileId, + /// File meta. + pub file_meta: FileMeta, } /// A worker to delete files in background. @@ -72,24 +69,22 @@ impl LocalFilePurger { impl FilePurger for LocalFilePurger { fn send_request(&self, request: PurgeRequest) { - let file_id = request.file_id; - let region_id = request.region_id; + let file_meta = request.file_meta; let sst_layer = self.sst_layer.clone(); // Remove meta of the file from cache. if let Some(cache) = &self.cache_manager { - cache.remove_parquet_meta_data(region_id, file_id); + cache.remove_parquet_meta_data(file_meta.region_id, file_meta.file_id); } if let Err(e) = self.scheduler.schedule(Box::pin(async move { - if let Err(e) = sst_layer.delete_sst(file_id).await { - error!(e; "Failed to delete SST file, file: {}, region: {}", - file_id.as_parquet(), region_id); + if let Err(e) = sst_layer.delete_sst(&file_meta).await { + error!(e; "Failed to delete SST file, file_id: {}, region: {}", + file_meta.file_id, file_meta.region_id); } else { info!( - "Successfully deleted SST file: {}, region: {}", - file_id.as_parquet(), - region_id + "Successfully deleted SST file, file_id: {}, region: {}", + file_meta.file_id, file_meta.region_id ); } })) { @@ -103,11 +98,12 @@ mod tests { use common_test_util::temp_dir::create_temp_dir; use object_store::services::Fs; use object_store::ObjectStore; + use smallvec::SmallVec; use super::*; use crate::access_layer::AccessLayer; use crate::schedule::scheduler::{LocalScheduler, Scheduler}; - use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange}; + use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType}; use crate::sst::location; #[tokio::test] @@ -137,6 +133,8 @@ mod tests { time_range: FileTimeRange::default(), level: 0, file_size: 4096, + available_indexes: Default::default(), + index_file_size: 0, }, file_purger, ); @@ -148,4 +146,52 @@ mod tests { assert!(!object_store.is_exist(&path).await.unwrap()); } + + #[tokio::test] + async fn test_file_purge_with_index() { + common_telemetry::init_default_ut_logging(); + + let dir = create_temp_dir("file-purge"); + let mut builder = Fs::default(); + builder.root(dir.path().to_str().unwrap()); + let object_store = ObjectStore::new(builder).unwrap().finish(); + let sst_file_id = FileId::random(); + let sst_dir = "table1"; + + let path = location::sst_file_path(sst_dir, sst_file_id); + object_store.write(&path, vec![0; 4096]).await.unwrap(); + + let index_path = location::index_file_path(sst_dir, sst_file_id); + object_store + .write(&index_path, vec![0; 4096]) + .await + .unwrap(); + + let scheduler = Arc::new(LocalScheduler::new(3)); + let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone())); + + let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None)); + + { + let handle = FileHandle::new( + FileMeta { + region_id: 0.into(), + file_id: sst_file_id, + time_range: FileTimeRange::default(), + level: 0, + file_size: 4096, + available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + index_file_size: 4096, + }, + file_purger, + ); + // mark file as deleted and drop the handle, we expect the sst file and the index file are deleted. + handle.mark_deleted(); + } + + scheduler.stop(true).await.unwrap(); + + assert!(!object_store.is_exist(&path).await.unwrap()); + assert!(!object_store.is_exist(&index_path).await.unwrap()); + } } diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index 0efc3f8e6ad0..5ae287f383a4 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -15,6 +15,7 @@ pub mod builder; use std::collections::BTreeSet; +use std::sync::Arc; use futures::{AsyncRead, AsyncSeek}; use index::inverted_index::format::reader::InvertedIndexBlobReader; @@ -52,8 +53,12 @@ pub struct SstIndexApplier { index_applier: Box, } +pub type SstIndexApplierRef = Arc; + impl SstIndexApplier { /// Creates a new [`SstIndexApplier`]. + /// + /// TODO(zhongzc): leverage `WriteCache` pub fn new( region_dir: String, object_store: ObjectStore, diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index e52070e1eb9e..a6e78ca94835 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -66,6 +66,10 @@ pub struct SstInfo { pub num_rows: usize, /// File Meta Data pub file_metadata: Option>, + /// Whether inverted index is available. + pub inverted_index_available: bool, + /// Index file size in bytes. + pub index_file_size: u64, } #[cfg(test)] diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index a9fac535355b..46bb00390827 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -26,6 +26,7 @@ //! //! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()). +use std::borrow::Borrow; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; @@ -261,7 +262,7 @@ impl ReadFormat { /// Returns min values of specific column in row groups. pub(crate) fn min_values( &self, - row_groups: &[RowGroupMetaData], + row_groups: &[impl Borrow], column_id: ColumnId, ) -> Option { let column = self.metadata.column_by_id(column_id)?; @@ -281,7 +282,7 @@ impl ReadFormat { /// Returns max values of specific column in row groups. pub(crate) fn max_values( &self, - row_groups: &[RowGroupMetaData], + row_groups: &[impl Borrow], column_id: ColumnId, ) -> Option { let column = self.metadata.column_by_id(column_id)?; @@ -301,7 +302,7 @@ impl ReadFormat { /// Returns null counts of specific column in row groups. pub(crate) fn null_counts( &self, - row_groups: &[RowGroupMetaData], + row_groups: &[impl Borrow], column_id: ColumnId, ) -> Option { let column = self.metadata.column_by_id(column_id)?; @@ -345,7 +346,7 @@ impl ReadFormat { /// Returns min/max values of specific tag. fn tag_values( &self, - row_groups: &[RowGroupMetaData], + row_groups: &[impl Borrow], column: &ColumnMetadata, is_min: bool, ) -> Option { @@ -363,7 +364,10 @@ impl ReadFormat { let converter = McmpRowCodec::new(vec![SortField::new(column.column_schema.data_type.clone())]); let values = row_groups.iter().map(|meta| { - let stats = meta.column(self.primary_key_position()).statistics()?; + let stats = meta + .borrow() + .column(self.primary_key_position()) + .statistics()?; if !stats.has_min_max_set() { return None; } @@ -400,7 +404,7 @@ impl ReadFormat { /// Returns min/max values of specific non-tag columns. fn column_values( - row_groups: &[RowGroupMetaData], + row_groups: &[impl Borrow], column: &ColumnMetadata, column_index: usize, is_min: bool, @@ -414,7 +418,7 @@ impl ReadFormat { let scalar_values = row_groups .iter() .map(|meta| { - let stats = meta.column(column_index).statistics()?; + let stats = meta.borrow().column(column_index).statistics()?; if !stats.has_min_max_set() { return None; } @@ -463,11 +467,11 @@ impl ReadFormat { /// Returns null counts of specific non-tag columns. fn column_null_counts( - row_groups: &[RowGroupMetaData], + row_groups: &[impl Borrow], column_index: usize, ) -> Option { let values = row_groups.iter().map(|meta| { - let col = meta.column(column_index); + let col = meta.borrow().column(column_index); let stat = col.statistics()?; Some(stat.null_count()) }); diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 60729c664283..ad3fb35ca665 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -14,12 +14,12 @@ //! Parquet reader. -use std::collections::{HashSet, VecDeque}; +use std::collections::{BTreeSet, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; -use common_telemetry::debug; +use common_telemetry::{debug, warn}; use common_time::range::TimestampRange; use datatypes::arrow::record_batch::RecordBatch; use object_store::ObjectStore; @@ -39,9 +39,10 @@ use crate::error::{ ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result, }; -use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; +use crate::metrics::{READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::{Batch, BatchReader}; use crate::sst::file::FileHandle; +use crate::sst::index::applier::SstIndexApplierRef; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::row_group::InMemoryRowGroup; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -64,6 +65,8 @@ pub struct ParquetReaderBuilder { projection: Option>, /// Manager that caches SST data. cache_manager: Option, + /// Index applier. + index_applier: Option, } impl ParquetReaderBuilder { @@ -81,6 +84,7 @@ impl ParquetReaderBuilder { time_range: None, projection: None, cache_manager: None, + index_applier: None, } } @@ -110,6 +114,13 @@ impl ParquetReaderBuilder { self } + /// Attaches the index applier to the builder. + #[must_use] + pub fn index_applier(mut self, index_applier: Option) -> Self { + self.index_applier = index_applier; + self + } + /// Builds and initializes a [ParquetReader]. /// /// This needs to perform IO operation. @@ -129,35 +140,8 @@ impl ParquetReaderBuilder { // Decodes region metadata. let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); let region_meta = Self::get_region_metadata(&file_path, key_value_meta)?; - // Computes column ids to read. - let column_ids: HashSet<_> = self - .projection - .as_ref() - .map(|p| p.iter().cloned().collect()) - .unwrap_or_else(|| { - region_meta - .column_metadatas - .iter() - .map(|c| c.column_id) - .collect() - }); let read_format = ReadFormat::new(Arc::new(region_meta)); - // Prunes row groups by metadata. - let row_groups: VecDeque<_> = if let Some(predicate) = &self.predicate { - let stats = - RowGroupPruningStats::new(parquet_meta.row_groups(), &read_format, column_ids); - - predicate - .prune_with_stats(&stats, read_format.metadata().schema.arrow_schema()) - .into_iter() - .enumerate() - .filter_map(|(idx, valid)| if valid { Some(idx) } else { None }) - .collect() - } else { - (0..parquet_meta.num_row_groups()).collect() - }; - // Computes the projection mask. let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); let projection_mask = if let Some(column_ids) = self.projection.as_ref() { @@ -174,6 +158,13 @@ impl ParquetReaderBuilder { parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) .context(ReadParquetSnafu { path: &file_path })?; + let mut metrics = Metrics::default(); + + // Computes row groups to read. + let row_groups = self + .row_groups_to_read(&read_format, &parquet_meta, &mut metrics) + .await; + let reader_builder = RowGroupReaderBuilder { file_handle: self.file_handle.clone(), file_path, @@ -185,7 +176,6 @@ impl ParquetReaderBuilder { }; let metrics = Metrics { - read_row_groups: row_groups.len(), build_cost: start.elapsed(), ..Default::default() }; @@ -256,13 +246,76 @@ impl ParquetReaderBuilder { Ok(metadata) } + + /// Computes row groups to read. + async fn row_groups_to_read( + &self, + read_format: &ReadFormat, + parquet_meta: &ParquetMetaData, + metrics: &mut Metrics, + ) -> BTreeSet { + let mut row_group_ids: BTreeSet<_> = (0..parquet_meta.num_row_groups()).collect(); + metrics.num_row_groups_unfiltered += row_group_ids.len(); + + // Applies index to prune row groups. + // + // TODO(zhongzc): Devise a mechanism to enforce the non-use of indices + // as an escape route in case of index issues, and it can be used to test + // the correctness of the index. + if let Some(index_applier) = &self.index_applier { + if self.file_handle.meta().inverted_index_available() { + match index_applier.apply(self.file_handle.file_id()).await { + Ok(row_groups) => row_group_ids = row_groups, + Err(err) => warn!( + err; "Failed to apply index, region_id: {}, file_id: {}", + self.file_handle.region_id(), self.file_handle.file_id()), + } + } + } + metrics.num_row_groups_inverted_index_selected += row_group_ids.len(); + + if row_group_ids.is_empty() { + return row_group_ids; + } + + // Prunes row groups by min-max index. + if let Some(predicate) = &self.predicate { + let region_meta = read_format.metadata(); + let column_ids = match &self.projection { + Some(ids) => ids.iter().cloned().collect(), + None => region_meta + .column_metadatas + .iter() + .map(|c| c.column_id) + .collect(), + }; + + let row_groups = row_group_ids + .iter() + .map(|id| parquet_meta.row_group(*id)) + .collect::>(); + let stats = RowGroupPruningStats::new(&row_groups, read_format, column_ids); + let mut mask = predicate + .prune_with_stats(&stats, region_meta.schema.arrow_schema()) + .into_iter(); + + row_group_ids.retain(|_| mask.next().unwrap_or(false)); + }; + metrics.num_row_groups_min_max_selected += row_group_ids.len(); + + row_group_ids + } } /// Parquet reader metrics. #[derive(Debug, Default)] struct Metrics { - /// Number of row groups to read. - read_row_groups: usize, + /// Number of unfiltered row groups. + num_row_groups_unfiltered: usize, + /// Number of row groups to read after filtering by inverted index. + num_row_groups_inverted_index_selected: usize, + /// Number of row groups to read after filtering by min-max index. + num_row_groups_min_max_selected: usize, /// Duration to build the parquet reader. build_cost: Duration, /// Duration to scan the reader. @@ -337,7 +390,7 @@ impl RowGroupReaderBuilder { /// Parquet batch reader to read our SST format. pub struct ParquetReader { /// Indices of row groups to read. - row_groups: VecDeque, + row_groups: BTreeSet, /// Helper to read record batches. /// /// Not `None` if [ParquetReader::stream] is not `None`. @@ -390,8 +443,8 @@ impl Drop for ParquetReader { self.reader_builder.file_handle.region_id(), self.reader_builder.file_handle.file_id(), self.reader_builder.file_handle.time_range(), - self.metrics.read_row_groups, - self.reader_builder.parquet_meta.num_row_groups(), + self.metrics.num_row_groups_min_max_selected, + self.metrics.num_row_groups_unfiltered, self.metrics ); @@ -405,6 +458,15 @@ impl Drop for ParquetReader { READ_ROWS_TOTAL .with_label_values(&["parquet"]) .inc_by(self.metrics.num_rows as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["unfiltered"]) + .inc_by(self.metrics.num_row_groups_unfiltered as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["inverted_index_selected"]) + .inc_by(self.metrics.num_row_groups_inverted_index_selected as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["min_max_index_selected"]) + .inc_by(self.metrics.num_row_groups_min_max_selected as u64); } } @@ -432,7 +494,7 @@ impl ParquetReader { } // No more items in current row group, reads next row group. - while let Some(row_group_idx) = self.row_groups.pop_front() { + while let Some(row_group_idx) = self.row_groups.pop_first() { let mut row_group_reader = self.reader_builder.build(row_group_idx).await?; let Some(record_batch) = row_group_reader diff --git a/src/mito2/src/sst/parquet/stats.rs b/src/mito2/src/sst/parquet/stats.rs index f0cda3846737..a17e8ace45ae 100644 --- a/src/mito2/src/sst/parquet/stats.rs +++ b/src/mito2/src/sst/parquet/stats.rs @@ -14,6 +14,7 @@ //! Statistics of parquet SSTs. +use std::borrow::Borrow; use std::collections::HashSet; use datafusion::physical_optimizer::pruning::PruningStatistics; @@ -25,9 +26,9 @@ use store_api::storage::ColumnId; use crate::sst::parquet::format::ReadFormat; /// Statistics for pruning row groups. -pub(crate) struct RowGroupPruningStats<'a> { +pub(crate) struct RowGroupPruningStats<'a, T> { /// Metadata of SST row groups. - row_groups: &'a [RowGroupMetaData], + row_groups: &'a [T], /// Helper to read the SST. read_format: &'a ReadFormat, /// Projected column ids to read. @@ -37,10 +38,10 @@ pub(crate) struct RowGroupPruningStats<'a> { column_ids: HashSet, } -impl<'a> RowGroupPruningStats<'a> { +impl<'a, T> RowGroupPruningStats<'a, T> { /// Creates a new statistics to prune specific `row_groups`. pub(crate) fn new( - row_groups: &'a [RowGroupMetaData], + row_groups: &'a [T], read_format: &'a ReadFormat, column_ids: HashSet, ) -> Self { @@ -61,7 +62,7 @@ impl<'a> RowGroupPruningStats<'a> { } } -impl<'a> PruningStatistics for RowGroupPruningStats<'a> { +impl<'a, T: Borrow> PruningStatistics for RowGroupPruningStats<'a, T> { fn min_values(&self, column: &Column) -> Option { let column_id = self.column_id_to_prune(&column.name)?; self.read_format.min_values(self.row_groups, column_id) diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 2ed226791dca..e1d8765f5f45 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -124,6 +124,8 @@ impl ParquetWriter { file_size, num_rows: stats.num_rows, file_metadata: Some(Arc::new(parquet_metadata)), + inverted_index_available: false, + index_file_size: 0, })) } diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index ce276ca71db7..c690e8e0860a 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -92,7 +92,10 @@ impl SstVersion { level_meta .files .values() - .map(|file_handle| file_handle.meta().file_size) + .map(|file_handle| { + let meta = file_handle.meta(); + meta.file_size + meta.index_file_size + }) .sum::() }) .sum() diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 1cc158ed91b5..7f4e7741757b 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -106,6 +106,8 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { ), level: 0, file_size: 0, + available_indexes: Default::default(), + index_file_size: 0, }, file_purger, ) diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index e480b1f146df..bc8035bb9a68 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -96,6 +96,8 @@ impl VersionControlBuilder { ), level: 0, file_size: 0, // We don't care file size. + available_indexes: Default::default(), + index_file_size: 0, }, ); self @@ -136,6 +138,8 @@ pub(crate) fn apply_edit( ), level: 0, file_size: 0, // We don't care file size. + available_indexes: Default::default(), + index_file_size: 0, } }) .collect();